You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2023/04/10 22:44:49 UTC

[pulsar] branch master updated: [feat] PIP-257: Add AuthenticationProviderOpenID (#19849)

This is an automated email from the ASF dual-hosted git repository.

mmarshall pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 11751b7da23 [feat] PIP-257: Add AuthenticationProviderOpenID (#19849)
11751b7da23 is described below

commit 11751b7da2316516a2c18c11b3bd4011641b93f4
Author: Michael Marshall <mm...@apache.org>
AuthorDate: Mon Apr 10 17:44:41 2023 -0500

    [feat] PIP-257: Add AuthenticationProviderOpenID (#19849)
    
    PIP: #19771
    
    ### Motivation
    
    This is the primary PR for PIP 257 (#19771). It adds an OpenID Conenct `AuthenticationProvider` implementation. The implementation is intended to be compliant with the OpenID Specs defined here: https://openid.net/developers/specs/. We specifically implement the discovery and these two:
    
    > * [OpenID Connect Core](https://openid.net/specs/openid-connect-core-1_0.html) – Defines the core OpenID Connect functionality: authentication built on top of OAuth 2.0 and the use of claims to communicate information about the End-User
    > * [OpenID Connect Discovery](https://openid.net/specs/openid-connect-discovery-1_0.html) – Defines how clients dynamically discover information about OpenID Providers
    
    ### Modifications
    
    * Add new module `pulsar-broker-auth-oidc`
    * Add implementation that relies on auth0 client libraries to verify the signature and claims of the JWT
    * Use async http client for all http requests
    * Cache the provider metadata and the JWKS results
    * Support different types of `FallbackDiscoveryMode`s, as documented in the code. Essentially, this setting allows users to more easily integrate with k8s. We need this coupling with kubernetes to deal with some of the nuances of the k8s implementation. Note that this part of the code is experimental and is subject to change as requirements and cloud provider implementations change. One important reason we use the k8s client is because the API Server requires special configuration for [...]
    * Add metrics to help quantify success and failure. (I had thought I would add audit logging, but that is an independent feature that we can add to the Pulsar framework. It seems outside the scope of an Authentication Provider implementation to implement this feature.)
    
    ### Verifying this change
    
    There are many new tests to cover this new implementation. Some of the tests are unit tests while others are integration tests that rely on Wire Mock to return the public key information.
    
    ### Documentation
    
    - [x] `doc-required`
    
    This feature will need new docs.
    
    ### Matching PR in forked repository
    
    PR in forked repository: https://github.com/michaeljmarshall/pulsar/pull/35
---
 distribution/server/pom.xml                        |   6 +
 distribution/server/src/assemble/LICENSE.bin.txt   |   3 +
 pom.xml                                            |   2 +
 pulsar-broker-auth-oidc/pom.xml                    | 179 +++++++
 .../oidc/AuthenticationExceptionCode.java          |  38 ++
 .../oidc/AuthenticationProviderOpenID.java         | 493 +++++++++++++++++++
 .../oidc/AuthenticationStateOpenID.java            |  96 ++++
 .../broker/authentication/oidc/ConfigUtils.java    | 143 ++++++
 .../authentication/oidc/FallbackDiscoveryMode.java |  61 +++
 .../broker/authentication/oidc/JwksCache.java      | 202 ++++++++
 .../oidc/OpenIDProviderMetadata.java               |  53 +++
 .../oidc/OpenIDProviderMetadataCache.java          | 232 +++++++++
 .../broker/authentication/oidc/package-info.java   |  19 +
 ...uthenticationProviderOpenIDIntegrationTest.java | 530 +++++++++++++++++++++
 .../oidc/AuthenticationProviderOpenIDTest.java     | 387 +++++++++++++++
 .../oidc/AuthenticationStateOpenIDTest.java        |  59 +++
 .../authentication/oidc/ConfigUtilsTest.java       | 151 ++++++
 .../src/test/java/resources/fakeKubeConfig.yaml    |  37 ++
 18 files changed, 2691 insertions(+)

diff --git a/distribution/server/pom.xml b/distribution/server/pom.xml
index 2043da516cf..6b225bfc00c 100644
--- a/distribution/server/pom.xml
+++ b/distribution/server/pom.xml
@@ -46,6 +46,12 @@
       <version>${project.version}</version>
     </dependency>
 
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-broker-auth-oidc</artifactId>
+      <version>${project.version}</version>
+    </dependency>
+
     <dependency>
       <groupId>${project.groupId}</groupId>
       <artifactId>pulsar-broker-auth-sasl</artifactId>
diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt
index 9205e072f7e..4cb1d1544a6 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -519,6 +519,9 @@ MIT License
     - org.checkerframework-checker-qual-3.12.0.jar
  * oshi
     - com.github.oshi-oshi-core-java11-6.4.0.jar
+ * Auth0, Inc.
+    - com.auth0-java-jwt-4.3.0.jar
+    - com.auth0-jwks-rsa-0.22.0.jar
 Protocol Buffers License
  * Protocol Buffers
    - com.google.protobuf-protobuf-java-3.19.6.jar -- ../licenses/LICENSE-protobuf.txt
diff --git a/pom.xml b/pom.xml
index d6d7fdf4468..389d9f6f834 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2159,6 +2159,7 @@ flexible messaging model and an intuitive client API.</description>
         <module>pulsar-broker-auth-athenz</module>
         <module>pulsar-client-auth-athenz</module>
         <module>pulsar-sql</module>
+        <module>pulsar-broker-auth-oidc</module>
         <module>pulsar-broker-auth-sasl</module>
         <module>pulsar-client-auth-sasl</module>
         <module>pulsar-config-validation</module>
@@ -2217,6 +2218,7 @@ flexible messaging model and an intuitive client API.</description>
         <module>pulsar-websocket</module>
         <module>pulsar-proxy</module>
         <module>pulsar-testclient</module>
+        <module>pulsar-broker-auth-oidc</module>
         <module>pulsar-broker-auth-sasl</module>
         <module>pulsar-client-auth-sasl</module>
         <module>pulsar-config-validation</module>
diff --git a/pulsar-broker-auth-oidc/pom.xml b/pulsar-broker-auth-oidc/pom.xml
new file mode 100644
index 00000000000..9ad0363775a
--- /dev/null
+++ b/pulsar-broker-auth-oidc/pom.xml
@@ -0,0 +1,179 @@
+<?xml version="1.0"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+<project
+  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
+  xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
+  <modelVersion>4.0.0</modelVersion>
+  <parent>
+    <groupId>org.apache.pulsar</groupId>
+    <artifactId>pulsar</artifactId>
+    <version>3.0.0-SNAPSHOT</version>
+  </parent>
+
+  <artifactId>pulsar-broker-auth-oidc</artifactId>
+  <packaging>jar</packaging>
+  <description>Open ID Connect authentication plugin for broker</description>
+
+  <properties>
+    <jsonwebtoken.version>0.11.5</jsonwebtoken.version>
+  </properties>
+
+  <dependencies>
+
+    <dependency>
+      <groupId>${project.groupId}</groupId>
+      <artifactId>pulsar-broker-common</artifactId>
+      <version>${project.version}</version>
+      <exclusions>
+        <exclusion>
+          <groupId>io.grpc</groupId>
+          <artifactId>*</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>com.auth0</groupId>
+      <artifactId>java-jwt</artifactId>
+      <version>4.3.0</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.auth0</groupId>
+      <artifactId>jwks-rsa</artifactId>
+      <version>0.22.0</version>
+    </dependency>
+
+    <dependency>
+      <groupId>com.github.ben-manes.caffeine</groupId>
+      <artifactId>caffeine</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>org.asynchttpclient</groupId>
+      <artifactId>async-http-client</artifactId>
+    </dependency>
+
+    <dependency>
+      <groupId>io.kubernetes</groupId>
+      <artifactId>client-java</artifactId>
+      <version>${kubernetesclient.version}</version>
+      <exclusions>
+        <!-- exclude prometheus http server since we don't export metrics from the client -->
+        <exclusion>
+          <groupId>io.prometheus</groupId>
+          <artifactId>simpleclient_httpserver</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+
+    <dependency>
+      <groupId>io.jsonwebtoken</groupId>
+      <artifactId>jjwt-api</artifactId>
+      <version>${jsonwebtoken.version}</version>
+      <scope>test</scope>
+    </dependency>
+    <dependency>
+      <groupId>io.jsonwebtoken</groupId>
+      <artifactId>jjwt-impl</artifactId>
+      <version>${jsonwebtoken.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>com.github.tomakehurst</groupId>
+      <artifactId>wiremock-jre8</artifactId>
+      <version>${wiremock.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+  </dependencies>
+
+  <profiles>
+    <profile>
+      <!-- enables builds with -Dmaven.test.skip=true -->
+      <id>test-jar-dependencies</id>
+      <activation>
+        <property>
+          <name>maven.test.skip</name>
+          <value>!true</value>
+        </property>
+      </activation>
+      <dependencies>
+        <dependency>
+          <groupId>${project.groupId}</groupId>
+          <artifactId>pulsar-broker</artifactId>
+          <version>${project.version}</version>
+          <scope>test</scope>
+          <type>test-jar</type>
+        </dependency>
+      </dependencies>
+    </profile>
+  </profiles>
+
+
+  <build>
+    <plugins>
+      <plugin>
+        <groupId>org.gaul</groupId>
+        <artifactId>modernizer-maven-plugin</artifactId>
+        <configuration>
+          <failOnViolations>true</failOnViolations>
+          <javaVersion>8</javaVersion>
+        </configuration>
+        <executions>
+          <execution>
+            <id>modernizer</id>
+            <phase>verify</phase>
+            <goals>
+              <goal>modernizer</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-checkstyle-plugin</artifactId>
+        <executions>
+          <execution>
+            <id>checkstyle</id>
+            <phase>verify</phase>
+            <goals>
+              <goal>check</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-surefire-plugin</artifactId>
+        <configuration>
+          <environmentVariables>
+            <KUBECONFIG_TEMPLATE>src/test/java/resources/fakeKubeConfig.yaml</KUBECONFIG_TEMPLATE>
+            <KUBECONFIG>${project.basedir}/target/kubeconfig.yaml</KUBECONFIG>
+          </environmentVariables>
+        </configuration>
+      </plugin>
+    </plugins>
+  </build>
+</project>
diff --git a/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationExceptionCode.java b/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationExceptionCode.java
new file mode 100644
index 00000000000..5f89f5f1370
--- /dev/null
+++ b/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationExceptionCode.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.authentication.oidc;
+
+/**
+ * Enum used to classify the types of exceptions encountered
+ * when attempting JWT verification.
+ */
+public enum AuthenticationExceptionCode {
+    UNSUPPORTED_ISSUER,
+    UNSUPPORTED_ALGORITHM,
+    ISSUER_MISMATCH,
+    ALGORITHM_MISMATCH,
+    INVALID_PUBLIC_KEY,
+    ERROR_RETRIEVING_PROVIDER_METADATA,
+    ERROR_RETRIEVING_PUBLIC_KEY,
+    ERROR_DECODING_JWT,
+    ERROR_VERIFYING_JWT,
+    ERROR_VERIFYING_JWT_SIGNATURE,
+    INVALID_JWT_CLAIM,
+    EXPIRED_JWT,
+}
diff --git a/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenID.java b/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenID.java
new file mode 100644
index 00000000000..ae4774b6f6b
--- /dev/null
+++ b/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenID.java
@@ -0,0 +1,493 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.authentication.oidc;
+
+import static org.apache.pulsar.broker.authentication.oidc.ConfigUtils.getConfigValueAsBoolean;
+import static org.apache.pulsar.broker.authentication.oidc.ConfigUtils.getConfigValueAsInt;
+import static org.apache.pulsar.broker.authentication.oidc.ConfigUtils.getConfigValueAsSet;
+import static org.apache.pulsar.broker.authentication.oidc.ConfigUtils.getConfigValueAsString;
+import com.auth0.jwk.InvalidPublicKeyException;
+import com.auth0.jwk.Jwk;
+import com.auth0.jwt.JWT;
+import com.auth0.jwt.JWTVerifier;
+import com.auth0.jwt.RegisteredClaims;
+import com.auth0.jwt.algorithms.Algorithm;
+import com.auth0.jwt.exceptions.AlgorithmMismatchException;
+import com.auth0.jwt.exceptions.InvalidClaimException;
+import com.auth0.jwt.exceptions.JWTDecodeException;
+import com.auth0.jwt.exceptions.JWTVerificationException;
+import com.auth0.jwt.exceptions.SignatureVerificationException;
+import com.auth0.jwt.exceptions.TokenExpiredException;
+import com.auth0.jwt.interfaces.Claim;
+import com.auth0.jwt.interfaces.DecodedJWT;
+import com.auth0.jwt.interfaces.Verification;
+import io.kubernetes.client.openapi.ApiClient;
+import io.kubernetes.client.util.Config;
+import io.netty.handler.ssl.SslContext;
+import io.netty.handler.ssl.SslContextBuilder;
+import java.io.File;
+import java.io.IOException;
+import java.net.SocketAddress;
+import java.security.PublicKey;
+import java.security.interfaces.ECPublicKey;
+import java.security.interfaces.RSAPublicKey;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import javax.naming.AuthenticationException;
+import javax.net.ssl.SSLSession;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.broker.authentication.AuthenticationProvider;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.broker.authentication.AuthenticationState;
+import org.apache.pulsar.broker.authentication.metrics.AuthenticationMetrics;
+import org.apache.pulsar.common.api.AuthData;
+import org.asynchttpclient.AsyncHttpClient;
+import org.asynchttpclient.AsyncHttpClientConfig;
+import org.asynchttpclient.DefaultAsyncHttpClient;
+import org.asynchttpclient.DefaultAsyncHttpClientConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An {@link AuthenticationProvider} implementation that supports the usage of a JSON Web Token (JWT)
+ * for client authentication. This implementation retrieves the PublicKey from the JWT issuer (assuming the
+ * issuer is in the configured allowed list) and then uses that Public Key to verify the validity of the JWT's
+ * signature.
+ *
+ * The Public Keys for a given provider are cached based on certain configured parameters to improve performance.
+ * The tradeoff here is that the longer Public Keys are cached, the longer an invalidated token could be used. One way
+ * to ensure caches are cleared is to restart all brokers.
+ *
+ * Class is called from multiple threads. The implementation must be thread safe. This class expects to be loaded once
+ * and then called concurrently for each new connection. The cache is backed by a GuavaCachedJwkProvider, which is
+ * thread-safe.
+ *
+ * Supported algorithms are: RS256, RS384, RS512, ES256, ES384, ES512 where the naming conventions follow
+ * this RFC: https://datatracker.ietf.org/doc/html/rfc7518#section-3.1.
+ */
+public class AuthenticationProviderOpenID implements AuthenticationProvider {
+    private static final Logger log = LoggerFactory.getLogger(AuthenticationProviderOpenID.class);
+
+    private static final String SIMPLE_NAME = AuthenticationProviderOpenID.class.getSimpleName();
+
+    // Must match the value used by the OAuth2 Client Plugin.
+    private static final String AUTH_METHOD_NAME = "token";
+
+    // This is backed by an ObjectMapper, which is thread safe. It is an optimization
+    // to share this for decoding JWTs for all connections to this broker.
+    private final JWT jwtLibrary = new JWT();
+
+    private Set<String> issuers;
+
+    // This caches the map from Issuer URL to the jwks_uri served at the /.well-known/openid-configuration endpoint
+    private OpenIDProviderMetadataCache openIDProviderMetadataCache;
+
+    // A cache used to store the results of getting the JWKS from the jwks_uri for an issuer.
+    private JwksCache jwksCache;
+
+    private volatile AsyncHttpClient httpClient;
+
+    // A list of supported algorithms. This is the "alg" field on the JWT.
+    // Source for strings: https://datatracker.ietf.org/doc/html/rfc7518#section-3.1.
+    private static final String ALG_RS256 = "RS256";
+    private static final String ALG_RS384 = "RS384";
+    private static final String ALG_RS512 = "RS512";
+    private static final String ALG_ES256 = "ES256";
+    private static final String ALG_ES384 = "ES384";
+    private static final String ALG_ES512 = "ES512";
+
+    private long acceptedTimeLeewaySeconds;
+    private FallbackDiscoveryMode fallbackDiscoveryMode;
+    private String roleClaim = ROLE_CLAIM_DEFAULT;
+    private boolean isRoleClaimNotSubject;
+
+    static final String ALLOWED_TOKEN_ISSUERS = "openIDAllowedTokenIssuers";
+    static final String ISSUER_TRUST_CERTS_FILE_PATH = "openIDTokenIssuerTrustCertsFilePath";
+    static final String FALLBACK_DISCOVERY_MODE = "openIDFallbackDiscoveryMode";
+    static final String ALLOWED_AUDIENCES = "openIDAllowedAudiences";
+    static final String ROLE_CLAIM = "openIDRoleClaim";
+    static final String ROLE_CLAIM_DEFAULT = "sub";
+    static final String ACCEPTED_TIME_LEEWAY_SECONDS = "openIDAcceptedTimeLeewaySeconds";
+    static final int ACCEPTED_TIME_LEEWAY_SECONDS_DEFAULT = 0;
+    static final String CACHE_SIZE = "openIDCacheSize";
+    static final int CACHE_SIZE_DEFAULT = 5;
+    static final String CACHE_REFRESH_AFTER_WRITE_SECONDS = "openIDCacheRefreshAfterWriteSeconds";
+    static final int CACHE_REFRESH_AFTER_WRITE_SECONDS_DEFAULT = 18 * 60 * 60;
+    static final String CACHE_EXPIRATION_SECONDS = "openIDCacheExpirationSeconds";
+    static final int CACHE_EXPIRATION_SECONDS_DEFAULT = 24 * 60 * 60;
+    static final String HTTP_CONNECTION_TIMEOUT_MILLIS = "openIDHttpConnectionTimeoutMillis";
+    static final int HTTP_CONNECTION_TIMEOUT_MILLIS_DEFAULT = 10_000;
+    static final String HTTP_READ_TIMEOUT_MILLIS = "openIDHttpReadTimeoutMillis";
+    static final int HTTP_READ_TIMEOUT_MILLIS_DEFAULT = 10_000;
+    static final String REQUIRE_HTTPS = "openIDRequireIssuersUseHttps";
+    static final boolean REQUIRE_HTTPS_DEFAULT = true;
+
+    // The list of audiences that are allowed to connect to this broker. A valid JWT must contain one of the audiences.
+    private String[] allowedAudiences;
+
+    @Override
+    public void initialize(ServiceConfiguration config) throws IOException {
+        this.allowedAudiences = validateAllowedAudiences(getConfigValueAsSet(config, ALLOWED_AUDIENCES));
+        this.roleClaim = getConfigValueAsString(config, ROLE_CLAIM, ROLE_CLAIM_DEFAULT);
+        this.isRoleClaimNotSubject = !ROLE_CLAIM_DEFAULT.equals(roleClaim);
+        this.acceptedTimeLeewaySeconds = getConfigValueAsInt(config, ACCEPTED_TIME_LEEWAY_SECONDS,
+                ACCEPTED_TIME_LEEWAY_SECONDS_DEFAULT);
+        boolean requireHttps = getConfigValueAsBoolean(config, REQUIRE_HTTPS, REQUIRE_HTTPS_DEFAULT);
+        this.fallbackDiscoveryMode = FallbackDiscoveryMode.valueOf(getConfigValueAsString(config,
+                FALLBACK_DISCOVERY_MODE, FallbackDiscoveryMode.DISABLED.name()));
+        this.issuers = validateIssuers(getConfigValueAsSet(config, ALLOWED_TOKEN_ISSUERS), requireHttps,
+                fallbackDiscoveryMode != FallbackDiscoveryMode.DISABLED);
+
+        int connectionTimeout = getConfigValueAsInt(config, HTTP_CONNECTION_TIMEOUT_MILLIS,
+                HTTP_CONNECTION_TIMEOUT_MILLIS_DEFAULT);
+        int readTimeout = getConfigValueAsInt(config, HTTP_READ_TIMEOUT_MILLIS, HTTP_READ_TIMEOUT_MILLIS_DEFAULT);
+        String trustCertsFilePath = getConfigValueAsString(config, ISSUER_TRUST_CERTS_FILE_PATH, null);
+        SslContext sslContext = null;
+        if (trustCertsFilePath != null) {
+            // Use default settings for everything but the trust store.
+            sslContext = SslContextBuilder.forClient()
+                    .trustManager(new File(trustCertsFilePath))
+                    .build();
+        }
+        AsyncHttpClientConfig clientConfig = new DefaultAsyncHttpClientConfig.Builder()
+                .setConnectTimeout(connectionTimeout)
+                .setReadTimeout(readTimeout)
+                .setSslContext(sslContext)
+                .build();
+        httpClient = new DefaultAsyncHttpClient(clientConfig);
+        ApiClient k8sApiClient =
+                fallbackDiscoveryMode != FallbackDiscoveryMode.DISABLED ? Config.defaultClient() : null;
+        this.openIDProviderMetadataCache = new OpenIDProviderMetadataCache(config, httpClient, k8sApiClient);
+        this.jwksCache = new JwksCache(config, httpClient, k8sApiClient);
+    }
+
+    @Override
+    public String getAuthMethodName() {
+        return AUTH_METHOD_NAME;
+    }
+
+    /**
+     * Authenticate the parameterized {@link AuthenticationDataSource} by verifying the issuer is an allowed issuer,
+     * then retrieving the JWKS URI from the issuer, then retrieving the Public key from the JWKS URI, and finally
+     * verifying the JWT signature and claims.
+     *
+     * @param authData - the authData passed by the Pulsar Broker containing the token.
+     * @return the role, if the JWT is authenticated, otherwise a failed future.
+     */
+    @Override
+    public CompletableFuture<String> authenticateAsync(AuthenticationDataSource authData) {
+        return authenticateTokenAsync(authData).thenApply(this::getRole);
+    }
+
+    /**
+     * Authenticate the parameterized {@link AuthenticationDataSource} and return the decoded JWT.
+     * @param authData - the authData containing the token.
+     * @return a completed future with the decoded JWT, if the JWT is authenticated. Otherwise, a failed future.
+     */
+    CompletableFuture<DecodedJWT> authenticateTokenAsync(AuthenticationDataSource authData) {
+        String token;
+        try {
+            token = AuthenticationProviderToken.getToken(authData);
+        } catch (AuthenticationException e) {
+            incrementFailureMetric(AuthenticationExceptionCode.ERROR_DECODING_JWT);
+            return CompletableFuture.failedFuture(e);
+        }
+        return authenticateToken(token)
+                .whenComplete((jwt, e) -> {
+                    if (jwt != null) {
+                        AuthenticationMetrics.authenticateSuccess(getClass().getSimpleName(), getAuthMethodName());
+                    }
+                    // Failure metrics are incremented within methods above
+                });
+    }
+
+    /**
+     * Get the role from a JWT at the configured role claim field.
+     * NOTE: does not do any verification of the JWT
+     * @param jwt - token to get the role from
+     * @return the role, or null, if it is not set on the JWT
+     */
+    String getRole(DecodedJWT jwt) {
+        try {
+            Claim roleClaim = jwt.getClaim(this.roleClaim);
+            if (roleClaim.isNull()) {
+                // The claim was not present in the JWT
+                return null;
+            }
+
+            String role = roleClaim.asString();
+            if (role != null) {
+                // The role is non null only if the JSON node is a text field
+                return role;
+            }
+
+            List<String> roles = jwt.getClaim(this.roleClaim).asList(String.class);
+            if (roles == null || roles.size() == 0) {
+                return null;
+            } else if (roles.size() == 1) {
+                return roles.get(0);
+            } else {
+                log.debug("JWT for subject [{}] has multiple roles; using the first one.", jwt.getSubject());
+                return roles.get(0);
+            }
+        } catch (JWTDecodeException e) {
+            log.error("Exception while retrieving role from JWT", e);
+            return null;
+        }
+    }
+
+    /**
+     * Convert a JWT string into a {@link DecodedJWT}
+     * The benefit of using this method is that it utilizes the already instantiated {@link JWT} parser.
+     * WARNING: this method does not verify the authenticity of the token. It only decodes it.
+     *
+     * @param token - string JWT to be decoded
+     * @return a decoded JWT
+     * @throws AuthenticationException if the token string is null or if any part of the token contains
+     *         an invalid jwt or JSON format of each of the jwt parts.
+     */
+    DecodedJWT decodeJWT(String token) throws AuthenticationException {
+        if (token == null) {
+            incrementFailureMetric(AuthenticationExceptionCode.ERROR_DECODING_JWT);
+            throw new AuthenticationException("Invalid token: cannot be null");
+        }
+        try {
+            return jwtLibrary.decodeJwt(token);
+        } catch (JWTDecodeException e) {
+            incrementFailureMetric(AuthenticationExceptionCode.ERROR_DECODING_JWT);
+            throw new AuthenticationException("Unable to decode JWT: " + e.getMessage());
+        }
+    }
+
+    /**
+     * Authenticate the parameterized JWT.
+     *
+     * @param token - a nonnull JWT to authenticate
+     * @return a fully authenticated JWT, or AuthenticationException if the JWT is proven to be invalid in any way
+     */
+    private CompletableFuture<DecodedJWT> authenticateToken(String token) {
+        if (token == null) {
+            incrementFailureMetric(AuthenticationExceptionCode.ERROR_DECODING_JWT);
+            return CompletableFuture.failedFuture(new AuthenticationException("JWT cannot be null"));
+        }
+        final DecodedJWT jwt;
+        try {
+            jwt = decodeJWT(token);
+        } catch (AuthenticationException e) {
+            incrementFailureMetric(AuthenticationExceptionCode.ERROR_DECODING_JWT);
+            return CompletableFuture.failedFuture(e);
+        }
+        return verifyIssuerAndGetJwk(jwt)
+                .thenCompose(jwk -> {
+                    try {
+                        if (!jwt.getAlgorithm().equals(jwk.getAlgorithm())) {
+                            incrementFailureMetric(AuthenticationExceptionCode.ALGORITHM_MISMATCH);
+                            return CompletableFuture.failedFuture(
+                                    new AuthenticationException("JWK's alg [" + jwk.getAlgorithm()
+                                            + "] does not match JWT's alg [" + jwt.getAlgorithm() + "]"));
+                        }
+                        // Verify the JWT signature
+                        // Throws exception if any verification check fails
+                        return CompletableFuture
+                                .completedFuture(verifyJWT(jwk.getPublicKey(), jwt.getAlgorithm(), jwt));
+                    } catch (InvalidPublicKeyException e) {
+                        incrementFailureMetric(AuthenticationExceptionCode.INVALID_PUBLIC_KEY);
+                        return CompletableFuture.failedFuture(
+                                new AuthenticationException("Invalid public key: " + e.getMessage()));
+                    } catch (AuthenticationException e) {
+                        return CompletableFuture.failedFuture(e);
+                    }
+                });
+    }
+
+    /**
+     * Verify the JWT's issuer (iss) claim is one of the allowed issuers and then retrieve the JWK from the issuer. If
+     * not, see {@link FallbackDiscoveryMode} for the fallback behavior.
+     * @param jwt - the token to use to discover the issuer's JWKS URI, which is then used to retrieve the issuer's
+     *            current public keys.
+     * @return a JWK that can be used to verify the JWT's signature
+     */
+    private CompletableFuture<Jwk> verifyIssuerAndGetJwk(DecodedJWT jwt) {
+        if (jwt.getIssuer() == null) {
+            incrementFailureMetric(AuthenticationExceptionCode.UNSUPPORTED_ISSUER);
+            return CompletableFuture.failedFuture(new AuthenticationException("Issuer cannot be null"));
+        } else if (this.issuers.contains(jwt.getIssuer())) {
+            // Retrieve the metadata: https://openid.net/specs/openid-connect-discovery-1_0.html#ProviderMetadata
+            return openIDProviderMetadataCache.getOpenIDProviderMetadataForIssuer(jwt.getIssuer())
+                    .thenCompose(metadata -> jwksCache.getJwk(metadata.getJwksUri(), jwt.getKeyId()));
+        } else if (fallbackDiscoveryMode == FallbackDiscoveryMode.KUBERNETES_DISCOVER_TRUSTED_ISSUER) {
+            return openIDProviderMetadataCache.getOpenIDProviderMetadataForKubernetesApiServer(jwt.getIssuer())
+                    .thenCompose(metadata ->
+                            openIDProviderMetadataCache.getOpenIDProviderMetadataForIssuer(metadata.getIssuer()))
+                    .thenCompose(metadata -> jwksCache.getJwk(metadata.getJwksUri(), jwt.getKeyId()));
+        } else if (fallbackDiscoveryMode == FallbackDiscoveryMode.KUBERNETES_DISCOVER_PUBLIC_KEYS) {
+            return openIDProviderMetadataCache.getOpenIDProviderMetadataForKubernetesApiServer(jwt.getIssuer())
+                    .thenCompose(__ -> jwksCache.getJwkFromKubernetesApiServer(jwt.getKeyId()));
+        } else {
+            incrementFailureMetric(AuthenticationExceptionCode.UNSUPPORTED_ISSUER);
+            return CompletableFuture
+                    .failedFuture(new AuthenticationException("Issuer not allowed: " + jwt.getIssuer()));
+        }
+    }
+
+    @Override
+    public AuthenticationState newAuthState(AuthData authData, SocketAddress remoteAddress, SSLSession sslSession)
+            throws AuthenticationException {
+        return new AuthenticationStateOpenID(this, remoteAddress, sslSession);
+    }
+
+    @Override
+    public void close() throws IOException {
+        httpClient.close();
+    }
+
+    /**
+     * Build and return a validator for the parameters.
+     *
+     * @param publicKey - the public key to use when configuring the validator
+     * @param publicKeyAlg - the algorithm for the parameterized public key
+     * @param jwt - jwt to be verified and returned (only if verified)
+     * @return a validator to use for validating a JWT associated with the parameterized public key.
+     * @throws AuthenticationException if the Public Key's algorithm is not supported or if the algorithm param does not
+     * match the Public Key's actual algorithm.
+     */
+    DecodedJWT verifyJWT(PublicKey publicKey,
+                                String publicKeyAlg,
+                                DecodedJWT jwt) throws AuthenticationException {
+        if (publicKeyAlg == null) {
+            incrementFailureMetric(AuthenticationExceptionCode.UNSUPPORTED_ALGORITHM);
+            throw new AuthenticationException("PublicKey algorithm cannot be null");
+        }
+
+        Algorithm alg;
+        try {
+            switch (publicKeyAlg) {
+                case ALG_RS256:
+                    alg = Algorithm.RSA256((RSAPublicKey) publicKey, null);
+                    break;
+                case ALG_RS384:
+                    alg = Algorithm.RSA384((RSAPublicKey) publicKey, null);
+                    break;
+                case ALG_RS512:
+                    alg = Algorithm.RSA512((RSAPublicKey) publicKey, null);
+                    break;
+                case ALG_ES256:
+                    alg = Algorithm.ECDSA256((ECPublicKey) publicKey, null);
+                    break;
+                case ALG_ES384:
+                    alg = Algorithm.ECDSA384((ECPublicKey) publicKey, null);
+                    break;
+                case ALG_ES512:
+                    alg = Algorithm.ECDSA512((ECPublicKey) publicKey, null);
+                    break;
+                default:
+                    incrementFailureMetric(AuthenticationExceptionCode.UNSUPPORTED_ALGORITHM);
+                    throw new AuthenticationException("Unsupported algorithm: " + publicKeyAlg);
+            }
+        } catch (ClassCastException e) {
+            incrementFailureMetric(AuthenticationExceptionCode.ALGORITHM_MISMATCH);
+            throw new AuthenticationException("Expected PublicKey alg [" + publicKeyAlg + "] does match actual alg.");
+        }
+
+        // We verify issuer when retrieving the PublicKey, so it is not verified here.
+        // The claim presence requirements are based on https://openid.net/specs/openid-connect-basic-1_0.html#IDToken
+         Verification verifierBuilder = JWT.require(alg)
+                .acceptLeeway(acceptedTimeLeewaySeconds)
+                .withAnyOfAudience(allowedAudiences)
+                .withClaimPresence(RegisteredClaims.ISSUED_AT)
+                .withClaimPresence(RegisteredClaims.EXPIRES_AT)
+                .withClaimPresence(RegisteredClaims.NOT_BEFORE)
+                .withClaimPresence(RegisteredClaims.SUBJECT);
+
+        if (isRoleClaimNotSubject) {
+            verifierBuilder = verifierBuilder.withClaimPresence(roleClaim);
+        }
+
+        JWTVerifier verifier = verifierBuilder.build();
+
+        try {
+            return verifier.verify(jwt);
+        } catch (TokenExpiredException e) {
+            incrementFailureMetric(AuthenticationExceptionCode.EXPIRED_JWT);
+            throw new AuthenticationException("JWT expired: " + e.getMessage());
+        } catch (SignatureVerificationException e) {
+            incrementFailureMetric(AuthenticationExceptionCode.ERROR_VERIFYING_JWT_SIGNATURE);
+            throw new AuthenticationException("JWT signature verification exception: " + e.getMessage());
+        } catch (InvalidClaimException e) {
+            incrementFailureMetric(AuthenticationExceptionCode.INVALID_JWT_CLAIM);
+            throw new AuthenticationException("JWT contains invalid claim: " + e.getMessage());
+        } catch (AlgorithmMismatchException e) {
+            incrementFailureMetric(AuthenticationExceptionCode.ALGORITHM_MISMATCH);
+            throw new AuthenticationException("JWT algorithm does not match Public Key algorithm: " + e.getMessage());
+        } catch (JWTDecodeException e) {
+            incrementFailureMetric(AuthenticationExceptionCode.ERROR_DECODING_JWT);
+            throw new AuthenticationException("Error while decoding JWT: " + e.getMessage());
+        } catch (JWTVerificationException | IllegalArgumentException e) {
+            incrementFailureMetric(AuthenticationExceptionCode.ERROR_VERIFYING_JWT);
+            throw new AuthenticationException("JWT verification failed: " + e.getMessage());
+        }
+    }
+
+    static void incrementFailureMetric(AuthenticationExceptionCode code) {
+        AuthenticationMetrics.authenticateFailure(SIMPLE_NAME, "token", code.toString());
+    }
+
+    /**
+     * Validate the configured allow list of allowedIssuers. The allowedIssuers set must be nonempty in order for
+     * the plugin to authenticate any token. Thus, it fails initialization if the configuration is
+     * missing. Each issuer URL should use the HTTPS scheme. The plugin fails initialization if any
+     * issuer url is insecure, unless requireHttps is false.
+     * @param allowedIssuers - issuers to validate
+     * @param requireHttps - whether to require https for issuers.
+     * @param allowEmptyIssuers - whether to allow empty issuers. This setting only makes sense when kubernetes is used
+     *                   as a fallback issuer.
+     * @return the validated issuers
+     * @throws IllegalArgumentException if the allowedIssuers is empty, or contains insecure issuers when required
+     */
+    private Set<String> validateIssuers(Set<String> allowedIssuers, boolean requireHttps, boolean allowEmptyIssuers) {
+        if (allowedIssuers == null || (allowedIssuers.isEmpty() && !allowEmptyIssuers)) {
+            throw new IllegalArgumentException("Missing configured value for: " + ALLOWED_TOKEN_ISSUERS);
+        }
+        for (String issuer : allowedIssuers) {
+            if (!issuer.toLowerCase().startsWith("https://")) {
+                log.warn("Allowed issuer is not using https scheme: {}", issuer);
+                if (requireHttps) {
+                    throw new IllegalArgumentException("Issuer URL does not use https, but must: " + issuer);
+                }
+            }
+        }
+        return allowedIssuers;
+    }
+
+    /**
+     * Validate the configured allow list of allowedAudiences. The allowedAudiences must be set because
+     * JWT must have an audience claim.
+     * See https://openid.net/specs/openid-connect-basic-1_0.html#IDTokenValidation.
+     * @param allowedAudiences
+     * @return the validated audiences
+     */
+    String[] validateAllowedAudiences(Set<String> allowedAudiences) {
+        if (allowedAudiences == null || allowedAudiences.isEmpty()) {
+            throw new IllegalArgumentException("Missing configured value for: " + ALLOWED_AUDIENCES);
+        }
+        return allowedAudiences.toArray(new String[0]);
+    }
+}
diff --git a/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationStateOpenID.java b/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationStateOpenID.java
new file mode 100644
index 00000000000..3046a6dd0e3
--- /dev/null
+++ b/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationStateOpenID.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.authentication.oidc;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import java.net.SocketAddress;
+import java.util.concurrent.CompletableFuture;
+import javax.naming.AuthenticationException;
+import javax.net.ssl.SSLSession;
+import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
+import org.apache.pulsar.broker.authentication.AuthenticationDataSource;
+import org.apache.pulsar.broker.authentication.AuthenticationState;
+import org.apache.pulsar.common.api.AuthData;
+
+/**
+ * Class representing the authentication state of a single connection.
+ */
+class AuthenticationStateOpenID implements AuthenticationState {
+    private final AuthenticationProviderOpenID provider;
+    private AuthenticationDataSource authenticationDataSource;
+    private volatile String role;
+    private final SocketAddress remoteAddress;
+    private final SSLSession sslSession;
+    private volatile long expiration;
+
+    AuthenticationStateOpenID(
+            AuthenticationProviderOpenID provider,
+            SocketAddress remoteAddress,
+            SSLSession sslSession) {
+        this.provider = provider;
+        this.remoteAddress = remoteAddress;
+        this.sslSession = sslSession;
+    }
+
+    @Override
+    public String getAuthRole() throws AuthenticationException {
+        if (role == null) {
+            throw new AuthenticationException("Authentication has not completed");
+        }
+        return role;
+    }
+
+    @Deprecated
+    @Override
+    public AuthData authenticate(AuthData authData) throws AuthenticationException {
+        // This method is not expected to be called and is subject to removal.
+        throw new AuthenticationException("Not supported");
+    }
+
+    @Override
+    public CompletableFuture<AuthData> authenticateAsync(AuthData authData) {
+        final String token = new String(authData.getBytes(), UTF_8);
+        this.authenticationDataSource = new AuthenticationDataCommand(token, remoteAddress, sslSession);
+        return provider
+                .authenticateTokenAsync(authenticationDataSource)
+                .thenApply(jwt -> {
+                    this.role = provider.getRole(jwt);
+                    // OIDC requires setting the exp claim, so this should never be null.
+                    // We verify it is not null during token validation.
+                    this.expiration = jwt.getExpiresAt().getTime();
+                    // Single stage authentication, so return null here
+                    return null;
+                });
+    }
+
+    @Override
+    public AuthenticationDataSource getAuthDataSource() {
+        return authenticationDataSource;
+    }
+
+    @Override
+    public boolean isComplete() {
+        return role != null;
+    }
+
+    @Override
+    public boolean isExpired() {
+        return System.currentTimeMillis() > expiration;
+    }
+}
diff --git a/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/ConfigUtils.java b/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/ConfigUtils.java
new file mode 100644
index 00000000000..f62bf9c8186
--- /dev/null
+++ b/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/ConfigUtils.java
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.authentication.oidc;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class ConfigUtils {
+    private static final Logger log = LoggerFactory.getLogger(ConfigUtils.class);
+
+    /**
+     * Get configured property as a string. If not configured, return null.
+     * @param conf - the configuration map
+     * @param configProp - the property to get
+     * @return a string from the conf or null, if the configuration property was not set
+     */
+    static String getConfigValueAsString(ServiceConfiguration conf,
+                                                String configProp) throws IllegalArgumentException {
+        String value = getConfigValueAsStringImpl(conf, configProp);
+        log.info("Configuration for [{}] is [{}]", configProp, value);
+        return value;
+    }
+
+    /**
+     * Get configured property as a string. If not configured, return null.
+     * @param conf - the configuration map
+     * @param configProp - the property to get
+     * @param defaultValue - the value to use if the configuration value is not set
+     * @return a string from the conf or the default value
+     */
+    static String getConfigValueAsString(ServiceConfiguration conf, String configProp,
+                                                String defaultValue) throws IllegalArgumentException {
+        String value = getConfigValueAsStringImpl(conf, configProp);
+        if (value == null) {
+            value = defaultValue;
+        }
+        log.info("Configuration for [{}] is [{}]", configProp, value);
+        return value;
+    }
+
+    /**
+     * Get configured property as a set. Split using a comma delimiter and remove any extra whitespace surrounding
+     * the commas. If not configured, return the empty set.
+     *
+     * @param conf - the map of configuration properties
+     * @param configProp - the property (key) to get
+     * @return a set of strings from the conf
+     */
+    static Set<String> getConfigValueAsSet(ServiceConfiguration conf, String configProp) {
+        String value = getConfigValueAsStringImpl(conf, configProp);
+        if (StringUtils.isBlank(value)) {
+            log.info("Configuration for [{}] is the empty set.", configProp);
+            return Collections.emptySet();
+        }
+        Set<String> set = Arrays.stream(value.trim().split("\\s*,\\s*")).collect(Collectors.toSet());
+        log.info("Configuration for [{}] is [{}].", configProp, String.join(", ", set));
+        return set;
+    }
+
+    private static String getConfigValueAsStringImpl(ServiceConfiguration conf,
+                                                     String configProp) throws IllegalArgumentException {
+        Object value = conf.getProperty(configProp);
+        if (value instanceof String) {
+            return (String) value;
+        } else {
+            return null;
+        }
+    }
+
+    /**
+     * Utility method to get an integer from the {@link ServiceConfiguration}. If the value is not a valid long or the
+     * key is not present in the conf, the default value will be used.
+     *
+     * @param conf - the map of configuration properties
+     * @param configProp - the property (key) to get
+     * @param defaultValue - the value to use if the property is missing from the conf
+     * @return a long
+     */
+    static int getConfigValueAsInt(ServiceConfiguration conf, String configProp, int defaultValue) {
+        Object value = conf.getProperty(configProp);
+        if (value instanceof Integer) {
+            log.info("Configuration for [{}] is [{}]", configProp, value);
+            return (Integer) value;
+        } else if (value instanceof String) {
+            try {
+                return Integer.parseInt((String) value);
+            } catch (NumberFormatException numberFormatException) {
+                log.error("Expected configuration for [{}] to be a long, but got [{}]. Using default value: [{}]",
+                        configProp, value, defaultValue, numberFormatException);
+                return defaultValue;
+            }
+        } else {
+            log.info("Configuration for [{}] is using the default value: [{}]", configProp, defaultValue);
+            return defaultValue;
+        }
+    }
+
+    /**
+     * Utility method to get a boolean from the {@link ServiceConfiguration}. If the key is present in the conf,
+     * return the default value. If key is present the value is not a valid boolean, the result will be false.
+     *
+     * @param conf - the map of configuration properties
+     * @param configProp - the property (key) to get
+     * @param defaultValue - the value to use if the property is missing from the conf
+     * @return a boolean
+     */
+    static boolean getConfigValueAsBoolean(ServiceConfiguration conf, String configProp, boolean defaultValue) {
+        Object value = conf.getProperty(configProp);
+        if (value instanceof Boolean) {
+            log.info("Configuration for [{}] is [{}]", configProp, value);
+            return (boolean) value;
+        } else if (value instanceof String) {
+            boolean result = Boolean.parseBoolean((String) value);
+            log.info("Configuration for [{}] is [{}]", configProp, result);
+            return result;
+        } else {
+            log.info("Configuration for [{}] is using the default value: [{}]", configProp, defaultValue);
+            return defaultValue;
+        }
+    }
+}
diff --git a/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/FallbackDiscoveryMode.java b/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/FallbackDiscoveryMode.java
new file mode 100644
index 00000000000..5bf0c1b23fc
--- /dev/null
+++ b/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/FallbackDiscoveryMode.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.authentication.oidc;
+
+import org.apache.pulsar.common.classification.InterfaceStability;
+
+/**
+ * These are the modes available for configuring how the Open ID Connect Authentication Provider should handle a JWT
+ * that has an issuer that is not explicitly in the allowed issuers set configured by
+ * {@link AuthenticationProviderOpenID#ALLOWED_TOKEN_ISSUERS}. The current implementations rely on using the Kubernetes
+ * Api Server's Open ID Connect features to discover an additional issuer or additional public keys to trust. See the
+ * Kubernetes documentation for more information on how Service Accounts can integrate with Open ID Connect.
+ * https://kubernetes.io/docs/tasks/configure-pod-container/configure-service-account/#service-account-issuer-discovery
+ */
+@InterfaceStability.Evolving
+public enum FallbackDiscoveryMode {
+    /**
+     * There will be no discovery of additional trusted issuers or public keys. This setting requires that operators
+     * explicitly allow all issuers that will be trusted. For the Kubernetes Service Account Token Projections to work,
+     * the operator must explicitly trust the issuer on the token's "iss" claim. This is the default setting because it
+     * is the only mode that explicitly follows the OIDC spec for verification of discovered provider configuration.
+     */
+    DISABLED,
+
+    /**
+     * The Kubernetes Api Server will be used to discover an additional trusted issuer by getting the issuer at the
+     * Api Server's /.well-known/openid-configuration endpoint, verifying that issuer matches the "iss" claim on the
+     * supplied token, then treating that issuer as a trusted issuer by discovering the jwks_uri via that issuer's
+     * /.well-known/openid-configuration endpoint. This mode can be helpful in EKS environments where the Api Server's
+     * public keys served at the /openid/v1/jwks endpoint are not the same as the public keys served at the issuer's
+     * jwks_uri. It fails to be OIDC compliant because the URL used to discover the provider configuration is not the
+     * same as the issuer claim on the token.
+     */
+    KUBERNETES_DISCOVER_TRUSTED_ISSUER,
+
+    /**
+     * The Kubernetes Api Server will be used to discover an additional set of valid public keys by getting the issuer
+     * at the Api Server's /.well-known/openid-configuration endpoint, verifying that issuer matches the "iss" claim on
+     * the supplied token, then calling the Api Server endpoint to get the public keys using a kubernetes client. This
+     * mode is currently useful getting the public keys from the Api Server because the Api Server requires custom TLS
+     * and authentication, and the kubernetes client automatically handles those. It fails to be OIDC compliant because
+     * the URL used to discover the provider configuration is not the same as the issuer claim on the token.
+     */
+    KUBERNETES_DISCOVER_PUBLIC_KEYS,
+}
diff --git a/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/JwksCache.java b/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/JwksCache.java
new file mode 100644
index 00000000000..12ea7ec6b90
--- /dev/null
+++ b/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/JwksCache.java
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.authentication.oidc;
+
+import static org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.CACHE_EXPIRATION_SECONDS;
+import static org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.CACHE_EXPIRATION_SECONDS_DEFAULT;
+import static org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.CACHE_REFRESH_AFTER_WRITE_SECONDS;
+import static org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.CACHE_REFRESH_AFTER_WRITE_SECONDS_DEFAULT;
+import static org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.CACHE_SIZE;
+import static org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.CACHE_SIZE_DEFAULT;
+import static org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.incrementFailureMetric;
+import static org.apache.pulsar.broker.authentication.oidc.ConfigUtils.getConfigValueAsInt;
+import com.auth0.jwk.Jwk;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectReader;
+import com.github.benmanes.caffeine.cache.AsyncCacheLoader;
+import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import io.kubernetes.client.openapi.ApiCallback;
+import io.kubernetes.client.openapi.ApiClient;
+import io.kubernetes.client.openapi.ApiException;
+import io.kubernetes.client.openapi.apis.OpenidApi;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import javax.naming.AuthenticationException;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.asynchttpclient.AsyncHttpClient;
+
+public class JwksCache {
+
+    // Map from an issuer's JWKS URI to its JWKS. When the Issuer is not empty, use the fallback client.
+    private final AsyncLoadingCache<Optional<String>, List<Jwk>> cache;
+
+    private final ObjectReader reader = new ObjectMapper().readerFor(HashMap.class);
+    private final AsyncHttpClient httpClient;
+    private final OpenidApi openidApi;
+
+    JwksCache(ServiceConfiguration config, AsyncHttpClient httpClient, ApiClient apiClient) throws IOException {
+        // Store the clients
+        this.httpClient = httpClient;
+        this.openidApi = apiClient != null ? new OpenidApi(apiClient) : null;
+
+        // Configure the cache
+        int maxSize = getConfigValueAsInt(config, CACHE_SIZE, CACHE_SIZE_DEFAULT);
+        int refreshAfterWriteSeconds = getConfigValueAsInt(config, CACHE_REFRESH_AFTER_WRITE_SECONDS,
+                CACHE_REFRESH_AFTER_WRITE_SECONDS_DEFAULT);
+        int expireAfterSeconds = getConfigValueAsInt(config, CACHE_EXPIRATION_SECONDS,
+                CACHE_EXPIRATION_SECONDS_DEFAULT);
+        AsyncCacheLoader<Optional<String>, List<Jwk>> loader = (jwksUri, executor) -> {
+            if (jwksUri.isPresent()) {
+                return getJwksFromJwksUri(jwksUri.get());
+            } else {
+                return getJwksFromKubernetesApiServer();
+            }
+        };
+        this.cache = Caffeine.newBuilder()
+                .maximumSize(maxSize)
+                .refreshAfterWrite(refreshAfterWriteSeconds, TimeUnit.SECONDS)
+                .expireAfterWrite(expireAfterSeconds, TimeUnit.SECONDS)
+                .buildAsync(loader);
+    }
+
+    CompletableFuture<Jwk> getJwk(String jwksUri, String keyId) {
+        if (jwksUri == null) {
+            incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PUBLIC_KEY);
+            return CompletableFuture.failedFuture(new IllegalArgumentException("jwksUri must not be null."));
+        }
+        return cache.get(Optional.of(jwksUri)).thenApply(jwks -> getJwkForKID(jwks, keyId));
+    }
+
+    private CompletableFuture<List<Jwk>> getJwksFromJwksUri(String jwksUri) {
+        return httpClient
+                .prepareGet(jwksUri)
+                .execute()
+                .toCompletableFuture()
+                .thenCompose(result -> {
+                    CompletableFuture<List<Jwk>> future = new CompletableFuture<>();
+                    try {
+                        HashMap<String, Object> jwks =
+                                reader.readValue(result.getResponseBodyAsBytes());
+                        future.complete(convertToJwks(jwksUri, jwks));
+                    } catch (AuthenticationException e) {
+                        incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PUBLIC_KEY);
+                        future.completeExceptionally(e);
+                    } catch (Exception e) {
+                        incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PUBLIC_KEY);
+                        future.completeExceptionally(new AuthenticationException(
+                                "Error retrieving public key at " + jwksUri + ": " + e.getMessage()));
+                    }
+                    return future;
+                });
+    }
+
+    CompletableFuture<Jwk> getJwkFromKubernetesApiServer(String keyId) {
+        if (openidApi == null) {
+            incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PUBLIC_KEY);
+            return CompletableFuture.failedFuture(new AuthenticationException(
+                    "Failed to retrieve public key from Kubernetes API server: Kubernetes fallback is not enabled."));
+        }
+        return cache.get(Optional.empty(), (__, executor) -> getJwksFromKubernetesApiServer())
+                .thenApply(jwks -> getJwkForKID(jwks, keyId));
+    }
+
+    private CompletableFuture<List<Jwk>> getJwksFromKubernetesApiServer() {
+        CompletableFuture<List<Jwk>> future = new CompletableFuture<>();
+        try {
+            openidApi.getServiceAccountIssuerOpenIDKeysetAsync(new ApiCallback<String>() {
+                @Override
+                public void onFailure(ApiException e, int statusCode, Map<String, List<String>> responseHeaders) {
+                    incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PUBLIC_KEY);
+                    future.completeExceptionally(
+                            new AuthenticationException("Failed to retrieve public key from Kubernetes API server: "
+                                    + e.getMessage()));
+                }
+
+                @Override
+                public void onSuccess(String result, int statusCode, Map<String, List<String>> responseHeaders) {
+                    try {
+                        HashMap<String, Object> jwks = reader.readValue(result);
+                        future.complete(convertToJwks("Kubernetes API server", jwks));
+                    } catch (AuthenticationException e) {
+                        incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PUBLIC_KEY);
+                        future.completeExceptionally(e);
+                    } catch (Exception e) {
+                        incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PUBLIC_KEY);
+                        future.completeExceptionally(new AuthenticationException(
+                                "Error retrieving public key at Kubernetes API server: " + e.getMessage()));
+                    }
+                }
+
+                @Override
+                public void onUploadProgress(long bytesWritten, long contentLength, boolean done) {
+
+                }
+
+                @Override
+                public void onDownloadProgress(long bytesRead, long contentLength, boolean done) {
+
+                }
+            });
+        } catch (ApiException e) {
+            incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PUBLIC_KEY);
+            future.completeExceptionally(
+                    new AuthenticationException("Failed to retrieve public key from Kubernetes API server: "
+                            + e.getMessage()));
+        }
+        return future;
+    }
+
+    private Jwk getJwkForKID(List<Jwk> jwks, String keyId) {
+        for (Jwk jwk : jwks) {
+            if (jwk.getId().equals(keyId)) {
+                return jwk;
+            }
+        }
+        incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PUBLIC_KEY);
+        throw new IllegalArgumentException("No JWK found for Key ID " + keyId);
+    }
+
+    /**
+     * The JWK Set is stored in the "keys" key see https://www.rfc-editor.org/rfc/rfc7517#section-5.1.
+     *
+     * @param jwksUri - the URI used to retrieve the JWKS
+     * @param jwks - the JWKS to convert
+     * @return a list of {@link Jwk}
+     */
+    private List<Jwk> convertToJwks(String jwksUri, Map<String, Object> jwks) throws AuthenticationException {
+        try {
+            @SuppressWarnings("unchecked")
+            List<Map<String, Object>> jwkList = (List<Map<String, Object>>) jwks.get("keys");
+            final List<Jwk> result = new ArrayList<>();
+            for (Map<String, Object> jwk : jwkList) {
+                result.add(Jwk.fromValues(jwk));
+            }
+            return result;
+        } catch (ClassCastException e) {
+            throw new AuthenticationException("Malformed JWKS returned by: " + jwksUri);
+        }
+    }
+}
diff --git a/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/OpenIDProviderMetadata.java b/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/OpenIDProviderMetadata.java
new file mode 100644
index 00000000000..553b3b882db
--- /dev/null
+++ b/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/OpenIDProviderMetadata.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.authentication.oidc;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonGetter;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * A Simple Class representing the essential fields of the OpenID Provider Metadata.
+ * Spec: https://openid.net/specs/openid-connect-discovery-1_0.html#ProviderMetadata
+ * Note that this class is only used for deserializing the JSON metadata response from
+ * calling a provider's /.well-known/openid-configuration endpoint.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class OpenIDProviderMetadata {
+
+    private final String issuer;
+    private final String jwksUri;
+
+    @JsonCreator(mode = JsonCreator.Mode.PROPERTIES)
+    public OpenIDProviderMetadata(@JsonProperty("issuer") String issuer, @JsonProperty("jwks_uri") String jwksUri) {
+        this.issuer = issuer;
+        this.jwksUri = jwksUri;
+    }
+
+    @JsonGetter
+    public String getIssuer() {
+        return issuer;
+    }
+
+    @JsonGetter
+    public String getJwksUri() {
+        return jwksUri;
+    }
+}
diff --git a/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/OpenIDProviderMetadataCache.java b/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/OpenIDProviderMetadataCache.java
new file mode 100644
index 00000000000..33d11f35a34
--- /dev/null
+++ b/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/OpenIDProviderMetadataCache.java
@@ -0,0 +1,232 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.authentication.oidc;
+
+import static org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.CACHE_EXPIRATION_SECONDS;
+import static org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.CACHE_EXPIRATION_SECONDS_DEFAULT;
+import static org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.CACHE_REFRESH_AFTER_WRITE_SECONDS;
+import static org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.CACHE_REFRESH_AFTER_WRITE_SECONDS_DEFAULT;
+import static org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.CACHE_SIZE;
+import static org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.CACHE_SIZE_DEFAULT;
+import static org.apache.pulsar.broker.authentication.oidc.AuthenticationProviderOpenID.incrementFailureMetric;
+import static org.apache.pulsar.broker.authentication.oidc.ConfigUtils.getConfigValueAsInt;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectReader;
+import com.github.benmanes.caffeine.cache.AsyncCacheLoader;
+import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import io.kubernetes.client.openapi.ApiCallback;
+import io.kubernetes.client.openapi.ApiClient;
+import io.kubernetes.client.openapi.ApiException;
+import io.kubernetes.client.openapi.apis.WellKnownApi;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import javax.annotation.Nonnull;
+import javax.naming.AuthenticationException;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.asynchttpclient.AsyncHttpClient;
+
+/**
+ * Class used to cache metadata responses from OpenID Providers.
+ */
+class OpenIDProviderMetadataCache {
+
+    private final ObjectReader reader = new ObjectMapper().readerFor(OpenIDProviderMetadata.class);
+    private final AsyncHttpClient httpClient;
+    private final WellKnownApi wellKnownApi;
+    private final AsyncLoadingCache<Optional<String>, OpenIDProviderMetadata> cache;
+    private static final String WELL_KNOWN_OPENID_CONFIG = ".well-known/openid-configuration";
+    private static final String SLASH_WELL_KNOWN_OPENID_CONFIG = "/" + WELL_KNOWN_OPENID_CONFIG;
+
+    OpenIDProviderMetadataCache(ServiceConfiguration config, AsyncHttpClient httpClient, ApiClient apiClient) {
+        int maxSize = getConfigValueAsInt(config, CACHE_SIZE, CACHE_SIZE_DEFAULT);
+        int refreshAfterWriteSeconds = getConfigValueAsInt(config, CACHE_REFRESH_AFTER_WRITE_SECONDS,
+                CACHE_REFRESH_AFTER_WRITE_SECONDS_DEFAULT);
+        int expireAfterSeconds = getConfigValueAsInt(config, CACHE_EXPIRATION_SECONDS,
+                CACHE_EXPIRATION_SECONDS_DEFAULT);
+        this.httpClient = httpClient;
+        this.wellKnownApi = apiClient != null ? new WellKnownApi(apiClient) : null;
+        AsyncCacheLoader<Optional<String>, OpenIDProviderMetadata> loader = (issuer, executor) -> {
+            if (issuer.isPresent()) {
+                return loadOpenIDProviderMetadataForIssuer(issuer.get());
+            } else {
+                return loadOpenIDProviderMetadataForKubernetesApiServer();
+            }
+        };
+        this.cache = Caffeine.newBuilder()
+                .maximumSize(maxSize)
+                .refreshAfterWrite(refreshAfterWriteSeconds, TimeUnit.SECONDS)
+                .expireAfterWrite(expireAfterSeconds, TimeUnit.SECONDS)
+                .buildAsync(loader);
+    }
+
+    /**
+     * Retrieve the OpenID Provider Metadata for the provided issuer.
+     * <p>
+     * Note: this method does not do any validation on the parameterized issuer. The OpenID Connect discovery
+     * spec requires that the issuer use the HTTPS scheme: https://openid.net/specs/openid-connect-discovery-1_0.html#ProviderMetadata.
+     * The {@link AuthenticationProviderOpenID} class handles this verification.
+     *
+     * @param issuer - authority from which to retrieve the OpenID Provider Metadata
+     * @return the {@link OpenIDProviderMetadata} for the given issuer. Fail the completable future with
+     * AuthenticationException if any exceptions occur while retrieving the metadata.
+     */
+    CompletableFuture<OpenIDProviderMetadata> getOpenIDProviderMetadataForIssuer(@Nonnull String issuer) {
+        return cache.get(Optional.of(issuer));
+    }
+
+    /**
+     * A loader for the cache that retrieves the metadata from the issuer's /.well-known/openid-configuration endpoint.
+     * @return a connection to the issuer's /.well-known/openid-configuration endpoint. Fails with
+     * AuthenticationException if the URL is malformed or there is an exception while opening the connection
+     */
+    private CompletableFuture<OpenIDProviderMetadata> loadOpenIDProviderMetadataForIssuer(String issuer) {
+        String url;
+        // TODO URI's normalization likely follows RFC2396 (library doesn't say so explicitly), whereas the spec
+        //  https://openid.net/specs/openid-connect-discovery-1_0.html#NormalizationSteps
+        //  calls for normalization according to RFC3986, which is supposed to obsolete RFC2396. Is this a problem?
+        if (issuer.endsWith("/")) {
+            url = issuer + WELL_KNOWN_OPENID_CONFIG;
+        } else {
+            url = issuer + SLASH_WELL_KNOWN_OPENID_CONFIG;
+        }
+
+        return httpClient
+                .prepareGet(url)
+                .execute()
+                .toCompletableFuture()
+                .thenCompose(result -> {
+                    CompletableFuture<OpenIDProviderMetadata> future = new CompletableFuture<>();
+                    try {
+                        OpenIDProviderMetadata openIDProviderMetadata =
+                                reader.readValue(result.getResponseBodyAsBytes());
+                        // We can verify this issuer once and cache the result because the issuer uniquely maps
+                        // to the cached object.
+                        verifyIssuer(issuer, openIDProviderMetadata, false);
+                        future.complete(openIDProviderMetadata);
+                    } catch (AuthenticationException e) {
+                        incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PROVIDER_METADATA);
+                        future.completeExceptionally(e);
+                    } catch (Exception e) {
+                        incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PROVIDER_METADATA);
+                        future.completeExceptionally(new AuthenticationException(
+                                "Error retrieving OpenID Provider Metadata at " + issuer + ": " + e.getMessage()));
+                    }
+                    return future;
+                });
+    }
+
+    /**
+     * Retrieve the OpenID Provider Metadata for the Kubernetes API server. This method is used instead of
+     * {@link #getOpenIDProviderMetadataForIssuer(String)} because different validations are done. The Kubernetes
+     * API server does not technically implement the complete OIDC spec for discovery, but it does implement some of
+     * it, so this method validates what it can. Specifically, it skips validation that the Discovery Document
+     * provider's URI matches the issuer. It verifies that the issuer on the discovery document matches the issuer
+     * claim
+     * @return
+     */
+    CompletableFuture<OpenIDProviderMetadata> getOpenIDProviderMetadataForKubernetesApiServer(String issClaim) {
+        return cache.get(Optional.empty()).thenCompose(openIDProviderMetadata -> {
+            CompletableFuture<OpenIDProviderMetadata> future = new CompletableFuture<>();
+            try {
+                verifyIssuer(issClaim, openIDProviderMetadata, true);
+                future.complete(openIDProviderMetadata);
+            } catch (AuthenticationException e) {
+                incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PROVIDER_METADATA);
+                future.completeExceptionally(e);
+            }
+            return future;
+        });
+    }
+
+    private CompletableFuture<OpenIDProviderMetadata> loadOpenIDProviderMetadataForKubernetesApiServer() {
+        CompletableFuture<OpenIDProviderMetadata> future = new CompletableFuture<>();
+        try {
+            wellKnownApi.getServiceAccountIssuerOpenIDConfigurationAsync(new ApiCallback<>() {
+                @Override
+                public void onFailure(ApiException e, int statusCode, Map<String, List<String>> responseHeaders) {
+                    incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PROVIDER_METADATA);
+                    future.completeExceptionally(new AuthenticationException(
+                            "Error retrieving OpenID Provider Metadata from Kubernetes API server: "
+                                    + e.getMessage()));
+                }
+
+                @Override
+                public void onSuccess(String result, int statusCode, Map<String, List<String>> responseHeaders) {
+                    try {
+                        // Validation that the token's issuer matches the issuer returned by the api server must be done
+                        // after the cache load operation to ensure each token's issuer matches the fallback issuer
+                        OpenIDProviderMetadata openIDProviderMetadata = reader.readValue(result);
+                        future.complete(openIDProviderMetadata);
+                    } catch (Exception e) {
+                        incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PROVIDER_METADATA);
+                        future.completeExceptionally(new AuthenticationException(
+                                "Error retrieving OpenID Provider Metadata from Kubernetes API Server: "
+                                        + e.getMessage()));
+                    }
+                }
+
+                @Override
+                public void onUploadProgress(long bytesWritten, long contentLength, boolean done) {
+
+                }
+
+                @Override
+                public void onDownloadProgress(long bytesRead, long contentLength, boolean done) {
+
+                }
+            });
+        } catch (ApiException e) {
+            incrementFailureMetric(AuthenticationExceptionCode.ERROR_RETRIEVING_PROVIDER_METADATA);
+            future.completeExceptionally(new AuthenticationException(
+                    "Error retrieving OpenID Provider Metadata from Kubernetes API server: " + e.getMessage()));
+        }
+        return future;
+    }
+
+    /**
+     * Verify the issuer url, as required by the OpenID Connect spec:
+     *
+     * Per the OpenID Connect Discovery spec, the issuer value returned MUST be identical to the
+     * Issuer URL that was directly used to retrieve the configuration information. This MUST also
+     * be identical to the iss Claim value in ID Tokens issued from this Issuer.
+     * https://openid.net/specs/openid-connect-discovery-1_0.html#ProviderConfigurationValidation
+     *
+     * @param issuer - the issuer used to retrieve the metadata
+     * @param metadata - the OpenID Provider Metadata
+     * @param isK8s - whether the issuer is represented by the Kubernetes API server. This affects error reporting.
+     * @throws AuthenticationException if the issuer does not exactly match the metadata issuer
+     */
+    private void verifyIssuer(@Nonnull String issuer, OpenIDProviderMetadata metadata,
+                              boolean isK8s) throws AuthenticationException {
+        if (!issuer.equals(metadata.getIssuer())) {
+            if (isK8s) {
+                incrementFailureMetric(AuthenticationExceptionCode.UNSUPPORTED_ISSUER);
+                throw new AuthenticationException("Issuer not allowed: " + issuer);
+            } else {
+                incrementFailureMetric(AuthenticationExceptionCode.ISSUER_MISMATCH);
+                throw new AuthenticationException(String.format("Issuer URL mismatch: [%s] should match [%s]",
+                        issuer, metadata.getIssuer()));
+            }
+        }
+    }
+}
diff --git a/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/package-info.java b/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/package-info.java
new file mode 100644
index 00000000000..d28f255d1be
--- /dev/null
+++ b/pulsar-broker-auth-oidc/src/main/java/org/apache/pulsar/broker/authentication/oidc/package-info.java
@@ -0,0 +1,19 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.authentication.oidc;
diff --git a/pulsar-broker-auth-oidc/src/test/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenIDIntegrationTest.java b/pulsar-broker-auth-oidc/src/test/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenIDIntegrationTest.java
new file mode 100644
index 00000000000..298492652c0
--- /dev/null
+++ b/pulsar-broker-auth-oidc/src/test/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenIDIntegrationTest.java
@@ -0,0 +1,530 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.authentication.oidc;
+
+import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
+import static com.github.tomakehurst.wiremock.client.WireMock.get;
+import static com.github.tomakehurst.wiremock.client.WireMock.urlEqualTo;
+import static com.github.tomakehurst.wiremock.client.WireMock.urlMatching;
+import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.wireMockConfig;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import static org.testng.Assert.fail;
+import com.github.tomakehurst.wiremock.WireMockServer;
+import io.jsonwebtoken.SignatureAlgorithm;
+import io.jsonwebtoken.impl.DefaultJwtBuilder;
+import io.jsonwebtoken.security.Keys;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.security.KeyPair;
+import java.security.PrivateKey;
+import java.security.interfaces.RSAPublicKey;
+import java.util.Base64;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.ExecutionException;
+import javax.naming.AuthenticationException;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
+import org.apache.pulsar.broker.authentication.AuthenticationState;
+import org.apache.pulsar.common.api.AuthData;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+/**
+ * An integration test relying on WireMock to simulate an OpenID Connect provider.
+ */
+public class AuthenticationProviderOpenIDIntegrationTest {
+
+    AuthenticationProviderOpenID provider;
+    PrivateKey privateKey;
+
+    // These are the kid values for JWKs in the /keys endpoint
+    String validJwk = "valid";
+    String invalidJwk = "invalid";
+
+    // The valid issuer
+    String issuer;
+    String issuerWithTrailingSlash;
+    // This issuer is configured to return an issuer in the openid-configuration
+    // that does not match the issuer on the token
+    String issuerThatFails;
+    String issuerK8s;
+    WireMockServer server;
+
+    @BeforeClass
+    void beforeClass() throws IOException {
+
+        // Port matches the port supplied in the fakeKubeConfig.yaml resource, which makes the k8s integration
+        // tests work correctly.
+        server = new WireMockServer(wireMockConfig().port(0));
+        server.start();
+        issuer = server.baseUrl();
+        issuerWithTrailingSlash = issuer + "/trailing-slash/";
+        issuerThatFails = issuer + "/fail";
+        issuerK8s = issuer + "/k8s";
+
+        // Set up a correct openid-configuration
+        server.stubFor(
+                get(urlEqualTo("/.well-known/openid-configuration"))
+                        .willReturn(aResponse()
+                                .withHeader("Content-Type", "application/json")
+                                .withBody("""
+                                        {
+                                          "issuer": "%s",
+                                          "jwks_uri": "%s/keys"
+                                        }
+                                        """.replace("%s", server.baseUrl()))));
+
+        // Set up a correct openid-configuration that the k8s integration test can use
+        // NOTE: integration tests revealed that the k8s client adds a trailing slash to the openid-configuration
+        // endpoint.
+        // NOTE: the jwks_uri is ignored, so we supply one that would fail here to ensure that we are not implicitly
+        // relying on the jwks_uri.
+        server.stubFor(
+                get(urlEqualTo("/k8s/.well-known/openid-configuration/"))
+                        .willReturn(aResponse()
+                                .withHeader("Content-Type", "application/json")
+                                .withBody("""
+                                        {
+                                          "issuer": "%s",
+                                          "jwks_uri": "%s/no/keys/hosted/here"
+                                        }
+                                        """.formatted(issuer, issuer))));
+
+        // Set up a correct openid-configuration that has a trailing slash in the issuers URL. This is a
+        // behavior observed by Auth0. In this case, the token's iss claim also has a trailing slash.
+        // The server should normalize the URL and call the Authorization Server without the double slash.
+        // NOTE: the spec does not indicate that the jwks_uri must have the same prefix as the issuer, and that
+        // is used here to simplify the testing.
+        server.stubFor(
+                get(urlEqualTo("/trailing-slash/.well-known/openid-configuration"))
+                        .willReturn(aResponse()
+                                .withHeader("Content-Type", "application/json")
+                                .withBody("""
+                                        {
+                                          "issuer": "%s",
+                                          "jwks_uri": "%s/keys"
+                                        }
+                                        """.formatted(issuerWithTrailingSlash, issuer))));
+
+        // Set up an incorrect openid-configuration where issuer does not match
+        server.stubFor(
+                get(urlEqualTo("/fail/.well-known/openid-configuration"))
+                        .willReturn(aResponse()
+                                .withHeader("Content-Type", "application/json")
+                                .withBody("""
+                                        {
+                                          "issuer": "https://wrong-issuer.com",
+                                          "jwks_uri": "%s/keys"
+                                        }
+                                        """.formatted(server.baseUrl()))));
+
+        // Create the token key pair
+        KeyPair keyPair = Keys.keyPairFor(SignatureAlgorithm.RS256);
+        privateKey = keyPair.getPrivate();
+        RSAPublicKey rsaPublicKey = (RSAPublicKey) keyPair.getPublic();
+        String n = Base64.getUrlEncoder().encodeToString(rsaPublicKey.getModulus().toByteArray());
+        String e = Base64.getUrlEncoder().encodeToString(rsaPublicKey.getPublicExponent().toByteArray());
+
+        // Set up JWKS endpoint with a valid and an invalid public key
+        // The url matches are for both the normal and the k8s endpoints
+        server.stubFor(
+                get(urlMatching( "/keys|/k8s/openid/v1/jwks/"))
+                        .willReturn(aResponse()
+                                .withHeader("Content-Type", "application/json")
+                                .withBody(
+                                        """
+                                        {
+                                            "keys" : [
+                                                {
+                                                "kid":"%s",
+                                                "kty":"RSA",
+                                                "alg":"RS256",
+                                                "n":"%s",
+                                                "e":"%s"
+                                                },
+                                                {
+                                                "kid": "%s",
+                                                "kty":"RSA",
+                                                "n":"invalid-key",
+                                                "e":"AQAB"
+                                                }
+                                            ]
+                                        }
+                                        """.formatted(validJwk, n, e, invalidJwk))));
+
+        ServiceConfiguration conf = new ServiceConfiguration();
+        conf.setAuthenticationEnabled(true);
+        conf.setAuthenticationProviders(Set.of(AuthenticationProviderOpenID.class.getName()));
+        Properties props = conf.getProperties();
+        props.setProperty(AuthenticationProviderOpenID.REQUIRE_HTTPS, "false");
+        props.setProperty(AuthenticationProviderOpenID.ALLOWED_AUDIENCES, "allowed-audience");
+        props.setProperty(AuthenticationProviderOpenID.ALLOWED_TOKEN_ISSUERS, issuer + "," + issuerWithTrailingSlash
+                + "," + issuerThatFails);
+
+        // Create the fake kube config file. This file is configured via the env vars and is written to the
+        // target directory so maven clean will remove it.
+        byte[] template = Files.readAllBytes(Path.of(System.getenv("KUBECONFIG_TEMPLATE")));
+        String kubeConfig = new String(template).replace("${WIRE_MOCK_PORT}", String.valueOf(server.port()));
+        Files.write(Path.of(System.getenv("KUBECONFIG")), kubeConfig.getBytes());
+
+        provider = new AuthenticationProviderOpenID();
+        provider.initialize(conf);
+    }
+
+    @AfterClass
+    void afterClass() {
+        server.stop();
+    }
+
+    @Test
+    public void testTokenWithValidJWK() throws Exception {
+        String role = "superuser";
+        String token = generateToken(validJwk, issuer, role, "allowed-audience", 0L, 0L, 10000L);
+        assertEquals(role, provider.authenticateAsync(new AuthenticationDataCommand(token)).get());
+    }
+
+    @Test
+    public void testTokenWithTrailingSlashAndValidJWK() throws Exception {
+        String role = "superuser";
+        String token = generateToken(validJwk, issuer + "/trailing-slash/", role, "allowed-audience", 0L, 0L, 10000L);
+        assertEquals(role, provider.authenticateAsync(new AuthenticationDataCommand(token)).get());
+    }
+
+    @Test
+    public void testTokenWithInvalidJWK() throws Exception {
+        String role = "superuser";
+        String token = generateToken(invalidJwk, issuer, role, "allowed-audience",0L, 0L, 10000L);
+        try {
+            provider.authenticateAsync(new AuthenticationDataCommand(token)).get();
+            fail("Expected exception");
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof AuthenticationException, "Found exception: " + e.getCause());
+        }
+    }
+
+    @Test
+    public void testAuthorizationServerReturnsIncorrectIssuerInOpenidConnectConfiguration() throws Exception {
+        String role = "superuser";
+        String token = generateToken(validJwk, issuerThatFails, role, "allowed-audience", 0L, 0L, 10000L);
+        try {
+            provider.authenticateAsync(new AuthenticationDataCommand(token)).get();
+            fail("Expected exception");
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof AuthenticationException, "Found exception: " + e.getCause());
+        }
+    }
+
+    @Test
+    public void testTokenWithInvalidAudience() throws Exception {
+        String role = "superuser";
+        String token = generateToken(validJwk, issuer, role, "invalid-audience", 0L, 0L, 10000L);
+        try {
+            provider.authenticateAsync(new AuthenticationDataCommand(token)).get();
+            fail("Expected exception");
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof AuthenticationException, "Found exception: " + e.getCause());
+        }
+    }
+
+    @Test
+    public void testTokenWithInvalidIssuer() throws Exception {
+        String role = "superuser";
+        String token = generateToken(validJwk, "https://not-an-allowed-issuer.com", role, "allowed-audience", 0L, 0L, 10000L);
+        try {
+            provider.authenticateAsync(new AuthenticationDataCommand(token)).get();
+            fail("Expected exception");
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof AuthenticationException, "Found exception: " + e.getCause());
+        }
+    }
+
+    @Test
+    public void testKubernetesApiServerAsDiscoverTrustedIssuerSuccess() throws Exception {
+        ServiceConfiguration conf = new ServiceConfiguration();
+        conf.setAuthenticationEnabled(true);
+        conf.setAuthenticationProviders(Set.of(AuthenticationProviderOpenID.class.getName()));
+        Properties props = conf.getProperties();
+        props.setProperty(AuthenticationProviderOpenID.REQUIRE_HTTPS, "false");
+        props.setProperty(AuthenticationProviderOpenID.ALLOWED_AUDIENCES, "allowed-audience");
+        props.setProperty(AuthenticationProviderOpenID.FALLBACK_DISCOVERY_MODE, "KUBERNETES_DISCOVER_TRUSTED_ISSUER");
+        // Test requires that k8sIssuer is not in the allowed token issuers
+        props.setProperty(AuthenticationProviderOpenID.ALLOWED_TOKEN_ISSUERS, "");
+
+        AuthenticationProviderOpenID provider = new AuthenticationProviderOpenID();
+        provider.initialize(conf);
+
+        String role = "superuser";
+        // We use the normal issuer on the token because the /k8s endpoint is configured via the kube config file
+        // made as part of the test setup. The kube client then gets the issuer from the /k8s endpoint and discovers
+        // this issuer.
+        String token = generateToken(validJwk, issuer, role, "allowed-audience", 0L, 0L, 10000L);
+        assertEquals(role, provider.authenticateAsync(new AuthenticationDataCommand(token)).get());
+
+        // Ensure that a subsequent token with a different issuer still fails due to invalid issuer exception
+        String token2 = generateToken(validJwk, "http://not-the-k8s-issuer", role, "allowed-audience", 0L, 0L, 10000L);
+        try {
+            provider.authenticateAsync(new AuthenticationDataCommand(token2)).get();
+            fail("Expected exception");
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof AuthenticationException, "Found exception: " + e.getCause());
+            assertTrue(e.getCause().getMessage().contains("Issuer not allowed"),
+                    "Unexpected error message: " + e.getMessage());
+        }
+    }
+
+    @Test
+    public void testKubernetesApiServerAsDiscoverTrustedIssuerFailsDueToMismatchedIssuerClaim() throws Exception {
+        ServiceConfiguration conf = new ServiceConfiguration();
+        conf.setAuthenticationEnabled(true);
+        conf.setAuthenticationProviders(Set.of(AuthenticationProviderOpenID.class.getName()));
+        Properties props = conf.getProperties();
+        props.setProperty(AuthenticationProviderOpenID.REQUIRE_HTTPS, "false");
+        props.setProperty(AuthenticationProviderOpenID.ALLOWED_AUDIENCES, "allowed-audience");
+        props.setProperty(AuthenticationProviderOpenID.FALLBACK_DISCOVERY_MODE, "KUBERNETES_DISCOVER_TRUSTED_ISSUER");
+        props.setProperty(AuthenticationProviderOpenID.ALLOWED_TOKEN_ISSUERS, "");
+
+        AuthenticationProviderOpenID provider = new AuthenticationProviderOpenID();
+        provider.initialize(conf);
+
+        String role = "superuser";
+        String token = generateToken(validJwk, "http://not-the-k8s-issuer", role, "allowed-audience", 0L, 0L, 10000L);
+        try {
+            provider.authenticateAsync(new AuthenticationDataCommand(token)).get();
+            fail("Expected exception");
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof AuthenticationException, "Found exception: " + e.getCause());
+        }
+    }
+
+
+    @Test
+    public void testKubernetesApiServerAsDiscoverPublicKeySuccess() throws Exception {
+        ServiceConfiguration conf = new ServiceConfiguration();
+        conf.setAuthenticationEnabled(true);
+        conf.setAuthenticationProviders(Set.of(AuthenticationProviderOpenID.class.getName()));
+        Properties props = conf.getProperties();
+        props.setProperty(AuthenticationProviderOpenID.REQUIRE_HTTPS, "false");
+        props.setProperty(AuthenticationProviderOpenID.ALLOWED_AUDIENCES, "allowed-audience");
+        props.setProperty(AuthenticationProviderOpenID.FALLBACK_DISCOVERY_MODE, "KUBERNETES_DISCOVER_PUBLIC_KEYS");
+        // Test requires that k8sIssuer is not in the allowed token issuers
+        props.setProperty(AuthenticationProviderOpenID.ALLOWED_TOKEN_ISSUERS, "");
+
+        AuthenticationProviderOpenID provider = new AuthenticationProviderOpenID();
+        provider.initialize(conf);
+
+        String role = "superuser";
+        String token = generateToken(validJwk, issuer, role, "allowed-audience", 0L, 0L, 10000L);
+        assertEquals(role, provider.authenticateAsync(new AuthenticationDataCommand(token)).get());
+
+        // Ensure that a subsequent token with a different issuer still fails due to invalid issuer exception
+        String token2 = generateToken(validJwk, "http://not-the-k8s-issuer", role, "allowed-audience", 0L, 0L, 10000L);
+        try {
+            provider.authenticateAsync(new AuthenticationDataCommand(token2)).get();
+            fail("Expected exception");
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof AuthenticationException, "Found exception: " + e.getCause());
+            assertTrue(e.getCause().getMessage().contains("Issuer not allowed"),
+                    "Unexpected error message: " + e.getMessage());
+        }
+    }
+
+    @Test
+    public void testKubernetesApiServerAsDiscoverPublicKeyFailsDueToMismatchedIssuerClaim() throws Exception {
+        ServiceConfiguration conf = new ServiceConfiguration();
+        conf.setAuthenticationEnabled(true);
+        conf.setAuthenticationProviders(Set.of(AuthenticationProviderOpenID.class.getName()));
+        Properties props = conf.getProperties();
+        props.setProperty(AuthenticationProviderOpenID.REQUIRE_HTTPS, "false");
+        props.setProperty(AuthenticationProviderOpenID.ALLOWED_AUDIENCES, "allowed-audience");
+        props.setProperty(AuthenticationProviderOpenID.FALLBACK_DISCOVERY_MODE, "KUBERNETES_DISCOVER_PUBLIC_KEYS");
+        props.setProperty(AuthenticationProviderOpenID.ALLOWED_TOKEN_ISSUERS, "");
+
+        AuthenticationProviderOpenID provider = new AuthenticationProviderOpenID();
+        provider.initialize(conf);
+
+        String role = "superuser";
+        String token = generateToken(validJwk, "http://not-the-k8s-issuer", role, "allowed-audience", 0L, 0L, 10000L);
+        try {
+            provider.authenticateAsync(new AuthenticationDataCommand(token)).get();
+            fail("Expected exception");
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof AuthenticationException, "Found exception: " + e.getCause());
+        }
+    }
+
+    @Test
+    public void testAuthenticationStateOpenIDForValidToken() throws Exception {
+        String role = "superuser";
+        String token = generateToken(validJwk, issuer, role, "allowed-audience", 0L, 0L, 10000L);
+        AuthenticationState state = provider.newAuthState(null, null, null);
+        AuthData result = state.authenticateAsync(AuthData.of(token.getBytes())).get();
+        assertNull(result);
+        assertEquals(state.getAuthRole(), role);
+        assertEquals(state.getAuthDataSource().getCommandData(), token);
+        assertFalse(state.isExpired());
+    }
+
+    @Test
+    public void testAuthenticationStateOpenIDForExpiredToken() throws Exception {
+        String role = "superuser";
+        String token = generateToken(validJwk, issuer, role, "allowed-audience", 0L, 0L, -10000L);
+        AuthenticationState state = provider.newAuthState(null, null, null);
+        try {
+            state.authenticateAsync(AuthData.of(token.getBytes())).get();
+            fail("Expected exception");
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof AuthenticationException, "Found exception: " + e.getCause());
+        }
+    }
+
+    @Test
+    public void testAuthenticationStateOpenIDForValidTokenWithNoExp() throws Exception {
+        String role = "superuser";
+        String token = generateToken(validJwk, issuer, role, "allowed-audience", 0L, 0L, null);
+        AuthenticationState state = provider.newAuthState(null, null, null);
+        try {
+            state.authenticateAsync(AuthData.of(token.getBytes())).get();
+            fail("Expected exception");
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof AuthenticationException, "Found exception: " + e.getCause());
+        }
+    }
+
+    @Test
+    public void testAuthenticationStateOpenIDForTokenExpiration() throws Exception {
+        ServiceConfiguration conf = new ServiceConfiguration();
+        conf.setAuthenticationEnabled(true);
+        conf.setAuthenticationProviders(Set.of(AuthenticationProviderOpenID.class.getName()));
+        Properties props = conf.getProperties();
+        props.setProperty(AuthenticationProviderOpenID.REQUIRE_HTTPS, "false");
+        props.setProperty(AuthenticationProviderOpenID.ALLOWED_AUDIENCES, "allowed-audience");
+        props.setProperty(AuthenticationProviderOpenID.ALLOWED_TOKEN_ISSUERS, issuer);
+        // Use the leeway to allow the token to pass validation and then fail expiration
+        props.setProperty(AuthenticationProviderOpenID.ACCEPTED_TIME_LEEWAY_SECONDS, "10");
+        AuthenticationProviderOpenID provider = new AuthenticationProviderOpenID();
+        provider.initialize(conf);
+
+        String role = "superuser";
+        String token = generateToken(validJwk, issuer, role, "allowed-audience", 0L, 0L, 0L);
+        AuthenticationState state = provider.newAuthState(null, null, null);
+        AuthData result = state.authenticateAsync(AuthData.of(token.getBytes())).get();
+        assertNull(result);
+        assertEquals(state.getAuthRole(), role);
+        assertEquals(state.getAuthDataSource().getCommandData(), token);
+        assertTrue(state.isExpired());
+    }
+
+    @Test
+    void ensureRoleClaimForNonSubClaimReturnsRole() throws Exception {
+        AuthenticationProviderOpenID provider = new AuthenticationProviderOpenID();
+        Properties props = new Properties();
+        props.setProperty(AuthenticationProviderOpenID.ALLOWED_TOKEN_ISSUERS, issuer);
+        props.setProperty(AuthenticationProviderOpenID.ALLOWED_AUDIENCES, "allowed-audience");
+        props.setProperty(AuthenticationProviderOpenID.ROLE_CLAIM, "test");
+        props.setProperty(AuthenticationProviderOpenID.REQUIRE_HTTPS, "false");
+        ServiceConfiguration config = new ServiceConfiguration();
+        config.setProperties(props);
+        provider.initialize(config);
+
+        // Build a JWT with a custom claim
+        HashMap<String, Object> claims = new HashMap();
+        claims.put("test", "my-role");
+        String token = generateToken(validJwk, issuer, "not-my-role", "allowed-audience", 0L,
+                0L, 10000L, claims);
+        assertEquals(provider.authenticateAsync(new AuthenticationDataCommand(token)).get(), "my-role");
+    }
+
+    @Test
+    void ensureRoleClaimForNonSubClaimFailsWhenClaimIsMissing() throws Exception {
+        AuthenticationProviderOpenID provider = new AuthenticationProviderOpenID();
+        Properties props = new Properties();
+        props.setProperty(AuthenticationProviderOpenID.ALLOWED_TOKEN_ISSUERS, issuer);
+        props.setProperty(AuthenticationProviderOpenID.ALLOWED_AUDIENCES, "allowed-audience");
+        props.setProperty(AuthenticationProviderOpenID.ROLE_CLAIM, "test");
+        props.setProperty(AuthenticationProviderOpenID.REQUIRE_HTTPS, "false");
+        ServiceConfiguration config = new ServiceConfiguration();
+        config.setProperties(props);
+        provider.initialize(config);
+
+        // Build a JWT without the "test" claim, which should cause the authentication to fail
+        String token = generateToken(validJwk, issuer, "not-my-role", "allowed-audience", 0L,
+                0L, 10000L);
+        try {
+            provider.authenticateAsync(new AuthenticationDataCommand(token)).get();
+            fail("Expected exception");
+        } catch (ExecutionException e) {
+            assertTrue(e.getCause() instanceof AuthenticationException, "Found exception: " + e.getCause());
+        }
+    }
+
+    // This test is somewhat counterintuitive. We allow the state object to change roles, but then we fail it
+    // in the ServerCnx handling of the state object. As such, it is essential that the state object allow
+    // the role to change.
+    @Test
+    public void testAuthenticationStateOpenIDAllowsRoleChange() throws Exception {
+        String role1 = "superuser";
+        String token1 = generateToken(validJwk, issuer, role1, "allowed-audience", 0L, 0L, 10000L);
+        String role2 = "otheruser";
+        String token2 = generateToken(validJwk, issuer, role2, "allowed-audience", 0L, 0L, 10000L);
+        AuthenticationState state = provider.newAuthState(null, null, null);
+        AuthData result1 = state.authenticateAsync(AuthData.of(token1.getBytes())).get();
+        assertNull(result1);
+        assertEquals(state.getAuthRole(), role1);
+        assertEquals(state.getAuthDataSource().getCommandData(), token1);
+        assertFalse(state.isExpired());
+
+        AuthData result2 = state.authenticateAsync(AuthData.of(token2.getBytes())).get();
+        assertNull(result2);
+        assertEquals(state.getAuthRole(), role2);
+        assertEquals(state.getAuthDataSource().getCommandData(), token2);
+        assertFalse(state.isExpired());
+    }
+
+    private String generateToken(String kid, String issuer, String subject, String audience,
+                                 Long iatOffset, Long nbfOffset, Long expOffset) {
+        return generateToken(kid, issuer, subject, audience, iatOffset, nbfOffset, expOffset, new HashMap<>());
+    }
+
+    private String generateToken(String kid, String issuer, String subject, String audience,
+                                 Long iatOffset, Long nbfOffset, Long expOffset, HashMap<String, Object> extraClaims) {
+        long now = System.currentTimeMillis();
+        DefaultJwtBuilder defaultJwtBuilder = new DefaultJwtBuilder();
+        defaultJwtBuilder.setHeaderParam("kid", kid);
+        defaultJwtBuilder.setHeaderParam("typ", "JWT");
+        defaultJwtBuilder.setHeaderParam("alg", "RS256");
+        defaultJwtBuilder.setIssuer(issuer);
+        defaultJwtBuilder.setSubject(subject);
+        defaultJwtBuilder.setAudience(audience);
+        defaultJwtBuilder.setIssuedAt(iatOffset != null ? new Date(now + iatOffset) : null);
+        defaultJwtBuilder.setNotBefore(nbfOffset != null ? new Date(now + nbfOffset) : null);
+        defaultJwtBuilder.setExpiration(expOffset != null ? new Date(now + expOffset) : null);
+        defaultJwtBuilder.addClaims(extraClaims);
+        defaultJwtBuilder.signWith(privateKey);
+        return defaultJwtBuilder.compact();
+    }
+
+}
diff --git a/pulsar-broker-auth-oidc/src/test/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenIDTest.java b/pulsar-broker-auth-oidc/src/test/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenIDTest.java
new file mode 100644
index 00000000000..74abffe9c38
--- /dev/null
+++ b/pulsar-broker-auth-oidc/src/test/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationProviderOpenIDTest.java
@@ -0,0 +1,387 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.authentication.oidc;
+
+import static org.testng.Assert.assertNull;
+import com.auth0.jwt.JWT;
+import com.auth0.jwt.interfaces.DecodedJWT;
+import io.jsonwebtoken.SignatureAlgorithm;
+import io.jsonwebtoken.impl.DefaultJwtBuilder;
+import io.jsonwebtoken.security.Keys;
+import java.security.KeyPair;
+import java.sql.Date;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Properties;
+import java.util.Set;
+import javax.naming.AuthenticationException;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.apache.pulsar.broker.authentication.AuthenticationDataCommand;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+/**
+ * Unit tests to cover the AuthenticationProviderOpenID without any network calls.
+ * <p>
+ * This class only tests the verification of tokens. It does not test the integration to retrieve tokens
+ * from an identity provider. See {@link AuthenticationProviderOpenIDIntegrationTest} for the authorization
+ * server integration tests.
+ * <p>
+ * Note: this class uses the io.jsonwebtoken library here because it has more utilities than the auth0 library.
+ * The jsonwebtoken library makes it easy to generate key pairs for many algorithms, and it also has an enum
+ * that can be used to assert that unsupported algorithms properly fail validation.
+ */
+public class AuthenticationProviderOpenIDTest {
+
+    // https://www.rfc-editor.org/rfc/rfc7518#section-3.1
+    @DataProvider(name = "supportedAlgorithms")
+    public static Object[][] supportedAlgorithms() {
+        return new Object[][] {
+                { SignatureAlgorithm.RS256 },
+                { SignatureAlgorithm.RS384 },
+                { SignatureAlgorithm.RS512 },
+                { SignatureAlgorithm.ES256 },
+                { SignatureAlgorithm.ES384 },
+                { SignatureAlgorithm.ES512 }
+        };
+    }
+
+    // Provider to use in common tests that are not verifying the configuration of the provider itself.
+    AuthenticationProviderOpenID basicProvider;
+    final String basicProviderAudience = "my-special-audience";
+
+    @BeforeClass
+    public void setup() throws Exception {
+        Properties properties = new Properties();
+        properties.setProperty(AuthenticationProviderOpenID.ALLOWED_AUDIENCES, basicProviderAudience);
+        properties.setProperty(AuthenticationProviderOpenID.ALLOWED_TOKEN_ISSUERS, "https://my-issuer.com");
+        ServiceConfiguration conf = new ServiceConfiguration();
+        conf.setProperties(properties);
+        basicProvider = new AuthenticationProviderOpenID();
+        basicProvider.initialize(conf);
+    }
+
+    @Test
+    public void testNullToken() {
+        AuthenticationProviderOpenID provider = new AuthenticationProviderOpenID();
+        Assert.assertThrows(AuthenticationException.class,
+                () -> provider.authenticate(new AuthenticationDataCommand(null)));
+    }
+
+    @Test
+    public void testThatNullAlgFails() {
+        AuthenticationProviderOpenID provider = new AuthenticationProviderOpenID();
+        Assert.assertThrows(AuthenticationException.class,
+                () -> provider.verifyJWT(null, null, null));
+    }
+
+    @Test
+    public void testThatUnsupportedAlgsThrowExceptions() {
+        Set<SignatureAlgorithm> unsupportedAlgs = new HashSet<>(Set.of(SignatureAlgorithm.values()));
+        Arrays.stream(supportedAlgorithms()).map(o -> (SignatureAlgorithm) o[0]).toList()
+                .forEach(unsupportedAlgs::remove);
+        unsupportedAlgs.forEach(unsupportedAlg -> {
+            AuthenticationProviderOpenID provider = new AuthenticationProviderOpenID();
+            // We don't create a public key because it's irrelevant
+            Assert.assertThrows(AuthenticationException.class,
+                    () -> provider.verifyJWT(null, unsupportedAlg.getValue(), null));
+        });
+    }
+
+    @Test(dataProvider = "supportedAlgorithms")
+    public void testThatSupportedAlgsWork(SignatureAlgorithm alg) throws AuthenticationException {
+        KeyPair keyPair = Keys.keyPairFor(alg);
+        DefaultJwtBuilder defaultJwtBuilder = new DefaultJwtBuilder();
+        addValidMandatoryClaims(defaultJwtBuilder, basicProviderAudience);
+        defaultJwtBuilder.signWith(keyPair.getPrivate());
+
+        // Convert to the right class
+        DecodedJWT expectedValue = JWT.decode(defaultJwtBuilder.compact());
+        DecodedJWT actualValue = basicProvider.verifyJWT(keyPair.getPublic(), alg.getValue(), expectedValue);
+        Assert.assertEquals(expectedValue, actualValue);
+    }
+
+    @Test
+    public void testThatSupportedAlgWithMismatchedPublicKeyFromDifferentAlgFamilyFails() {
+        KeyPair keyPair = Keys.keyPairFor(SignatureAlgorithm.RS256);
+        AuthenticationProviderOpenID provider = new AuthenticationProviderOpenID();
+        DefaultJwtBuilder defaultJwtBuilder = new DefaultJwtBuilder();
+        addValidMandatoryClaims(defaultJwtBuilder, basicProviderAudience);
+        defaultJwtBuilder.signWith(keyPair.getPrivate());
+        DecodedJWT jwt = JWT.decode(defaultJwtBuilder.compact());
+        // Choose a different algorithm from a different alg family
+        Assert.assertThrows(AuthenticationException.class,
+                () -> provider.verifyJWT(keyPair.getPublic(), SignatureAlgorithm.ES512.getValue(), jwt));
+    }
+
+    @Test
+    public void testThatSupportedAlgWithMismatchedPublicKeyFromSameAlgFamilyFails() {
+        KeyPair keyPair = Keys.keyPairFor(SignatureAlgorithm.RS256);
+        AuthenticationProviderOpenID provider = new AuthenticationProviderOpenID();
+        DefaultJwtBuilder defaultJwtBuilder = new DefaultJwtBuilder();
+        addValidMandatoryClaims(defaultJwtBuilder, basicProviderAudience);
+        defaultJwtBuilder.signWith(keyPair.getPrivate());
+        DecodedJWT jwt = JWT.decode(defaultJwtBuilder.compact());
+        // Choose a different algorithm but within the same alg family as above
+        Assert.assertThrows(AuthenticationException.class,
+                () -> provider.verifyJWT(keyPair.getPublic(), SignatureAlgorithm.RS512.getValue(), jwt));
+    }
+
+    @Test
+    public void ensureExpiredTokenFails() {
+        KeyPair keyPair = Keys.keyPairFor(SignatureAlgorithm.RS256);
+        DefaultJwtBuilder defaultJwtBuilder = new DefaultJwtBuilder();
+        addValidMandatoryClaims(defaultJwtBuilder, basicProviderAudience);
+        defaultJwtBuilder.setExpiration(Date.from(Instant.now().minusSeconds(60)));
+        defaultJwtBuilder.signWith(keyPair.getPrivate());
+        DecodedJWT jwt = JWT.decode(defaultJwtBuilder.compact());
+        Assert.assertThrows(AuthenticationException.class,
+                () -> basicProvider.verifyJWT(keyPair.getPublic(), SignatureAlgorithm.RS256.getValue(), jwt));
+    }
+
+    @Test
+    public void ensureFutureNBFFails() throws Exception {
+        KeyPair keyPair = Keys.keyPairFor(SignatureAlgorithm.RS256);
+        DefaultJwtBuilder defaultJwtBuilder = new DefaultJwtBuilder();
+        addValidMandatoryClaims(defaultJwtBuilder, basicProviderAudience);
+        // Override the exp set in the above method
+        defaultJwtBuilder.setNotBefore(Date.from(Instant.now().plusSeconds(60)));
+        defaultJwtBuilder.signWith(keyPair.getPrivate());
+        DecodedJWT jwt = JWT.decode(defaultJwtBuilder.compact());
+        Assert.assertThrows(AuthenticationException.class,
+                () -> basicProvider.verifyJWT(keyPair.getPublic(), SignatureAlgorithm.RS256.getValue(), jwt));
+    }
+
+    @Test
+    public void ensureFutureIATFails() throws Exception {
+        KeyPair keyPair = Keys.keyPairFor(SignatureAlgorithm.RS256);
+        DefaultJwtBuilder defaultJwtBuilder = new DefaultJwtBuilder();
+        addValidMandatoryClaims(defaultJwtBuilder, basicProviderAudience);
+        // Override the exp set in the above method
+        defaultJwtBuilder.setIssuedAt(Date.from(Instant.now().plusSeconds(60)));
+        defaultJwtBuilder.signWith(keyPair.getPrivate());
+        DecodedJWT jwt = JWT.decode(defaultJwtBuilder.compact());
+        Assert.assertThrows(AuthenticationException.class,
+                () -> basicProvider.verifyJWT(keyPair.getPublic(), SignatureAlgorithm.RS256.getValue(), jwt));
+    }
+
+    @Test
+    public void ensureRecentlyExpiredTokenWithinConfiguredLeewaySucceeds() throws Exception {
+        KeyPair keyPair = Keys.keyPairFor(SignatureAlgorithm.RS256);
+
+        // Set up the provider
+        AuthenticationProviderOpenID provider = new AuthenticationProviderOpenID();
+        Properties props = new Properties();
+        props.setProperty(AuthenticationProviderOpenID.ACCEPTED_TIME_LEEWAY_SECONDS, "10");
+        props.setProperty(AuthenticationProviderOpenID.ALLOWED_AUDIENCES, "leewayAudience");
+        props.setProperty(AuthenticationProviderOpenID.ALLOWED_TOKEN_ISSUERS, "https://localhost:8080");
+        ServiceConfiguration config = new ServiceConfiguration();
+        config.setProperties(props);
+        provider.initialize(config);
+
+        // Build the JWT with an only recently expired token
+        DefaultJwtBuilder defaultJwtBuilder = new DefaultJwtBuilder();
+        addValidMandatoryClaims(defaultJwtBuilder, "leewayAudience");
+        defaultJwtBuilder.setExpiration(Date.from(Instant.ofEpochMilli(System.currentTimeMillis() - 5000L)));
+        defaultJwtBuilder.signWith(keyPair.getPrivate());
+        DecodedJWT expectedValue = JWT.decode(defaultJwtBuilder.compact());
+
+        // Test the verification
+        DecodedJWT actualValue = null;
+        try {
+            actualValue = provider.verifyJWT(keyPair.getPublic(), SignatureAlgorithm.RS256.getValue(), expectedValue);
+        } catch (Exception e) {
+            Assert.fail("Token verification should not have thrown an exception.", e);
+        }
+        Assert.assertEquals(expectedValue, actualValue);
+    }
+
+    @Test
+    public void ensureEmptyIssuersFailsInitialization() {
+        AuthenticationProviderOpenID provider = new AuthenticationProviderOpenID();
+        Properties props = new Properties();
+        props.setProperty(AuthenticationProviderOpenID.ALLOWED_TOKEN_ISSUERS, "");
+        ServiceConfiguration config = new ServiceConfiguration();
+        config.setProperties(props);
+        Assert.assertThrows(IllegalArgumentException.class, () -> provider.initialize(config));
+    }
+
+    @Test
+    public void ensureEmptyIssuersFailsInitializationWithDisabledDiscoveryMode() {
+        AuthenticationProviderOpenID provider = new AuthenticationProviderOpenID();
+        Properties props = new Properties();
+        props.setProperty(AuthenticationProviderOpenID.ALLOWED_TOKEN_ISSUERS, "");
+        props.setProperty(AuthenticationProviderOpenID.FALLBACK_DISCOVERY_MODE, "DISABLED");
+        ServiceConfiguration config = new ServiceConfiguration();
+        config.setProperties(props);
+        Assert.assertThrows(IllegalArgumentException.class, () -> provider.initialize(config));
+    }
+
+    @Test
+    public void ensureEmptyIssuersWithK8sTrustedIssuerEnabledPassesInitialization() throws Exception {
+        AuthenticationProviderOpenID provider = new AuthenticationProviderOpenID();
+        Properties props = new Properties();
+        props.setProperty(AuthenticationProviderOpenID.ALLOWED_AUDIENCES, "my-audience");
+        props.setProperty(AuthenticationProviderOpenID.ALLOWED_TOKEN_ISSUERS, "");
+        props.setProperty(AuthenticationProviderOpenID.FALLBACK_DISCOVERY_MODE, "KUBERNETES_DISCOVER_TRUSTED_ISSUER");
+        ServiceConfiguration config = new ServiceConfiguration();
+        config.setProperties(props);
+        provider.initialize(config);
+    }
+
+    @Test
+    public void ensureEmptyIssuersWithK8sPublicKeyEnabledPassesInitialization() throws Exception {
+        AuthenticationProviderOpenID provider = new AuthenticationProviderOpenID();
+        Properties props = new Properties();
+        props.setProperty(AuthenticationProviderOpenID.ALLOWED_AUDIENCES, "my-audience");
+        props.setProperty(AuthenticationProviderOpenID.ALLOWED_TOKEN_ISSUERS, "");
+        props.setProperty(AuthenticationProviderOpenID.FALLBACK_DISCOVERY_MODE, "KUBERNETES_DISCOVER_PUBLIC_KEYS");
+        ServiceConfiguration config = new ServiceConfiguration();
+        config.setProperties(props);
+        provider.initialize(config);
+    }
+
+    @Test
+    public void ensureNullIssuersFailsInitialization() {
+        AuthenticationProviderOpenID provider = new AuthenticationProviderOpenID();
+        ServiceConfiguration config = new ServiceConfiguration();
+        // Make sure this still defaults to null.
+        assertNull(config.getProperties().get(AuthenticationProviderOpenID.ALLOWED_TOKEN_ISSUERS));
+        Assert.assertThrows(IllegalArgumentException.class, () -> provider.initialize(config));
+    }
+
+    @Test
+    public void ensureInsecureIssuerFailsInitialization() {
+        AuthenticationProviderOpenID provider = new AuthenticationProviderOpenID();
+        Properties props = new Properties();
+        props.setProperty(AuthenticationProviderOpenID.ALLOWED_TOKEN_ISSUERS, "https://myissuer.com,http://myissuer.com");
+        ServiceConfiguration config = new ServiceConfiguration();
+        config.setProperties(props);
+        Assert.assertThrows(IllegalArgumentException.class, () -> provider.initialize(config));
+    }
+
+    @Test void ensureMissingRoleClaimReturnsNull() throws Exception {
+        // Build an empty JWT
+        DefaultJwtBuilder defaultJwtBuilder = new DefaultJwtBuilder();
+        defaultJwtBuilder.setAudience(basicProviderAudience);
+        DecodedJWT jwtWithoutSub = JWT.decode(defaultJwtBuilder.compact());
+
+        // A JWT with an empty role claim must result in a null role
+        assertNull(basicProvider.getRole(jwtWithoutSub));
+    }
+
+    @Test void ensureRoleClaimForStringReturnsRole() throws Exception {
+        AuthenticationProviderOpenID provider = new AuthenticationProviderOpenID();
+        Properties props = new Properties();
+        props.setProperty(AuthenticationProviderOpenID.ALLOWED_TOKEN_ISSUERS, "https://myissuer.com");
+        props.setProperty(AuthenticationProviderOpenID.ALLOWED_AUDIENCES, basicProviderAudience);
+        props.setProperty(AuthenticationProviderOpenID.ROLE_CLAIM, "sub");
+        ServiceConfiguration config = new ServiceConfiguration();
+        config.setProperties(props);
+        provider.initialize(config);
+
+        // Build an empty JWT
+        DefaultJwtBuilder defaultJwtBuilder = new DefaultJwtBuilder();
+        defaultJwtBuilder.setSubject("my-role");
+        defaultJwtBuilder.setAudience(basicProviderAudience);
+        DecodedJWT jwt = JWT.decode(defaultJwtBuilder.compact());
+
+        Assert.assertEquals("my-role", provider.getRole(jwt));
+    }
+
+    @Test void ensureRoleClaimForSingletonListReturnsRole() throws Exception {
+        AuthenticationProviderOpenID provider = new AuthenticationProviderOpenID();
+        Properties props = new Properties();
+        props.setProperty(AuthenticationProviderOpenID.ALLOWED_TOKEN_ISSUERS, "https://myissuer.com");
+        props.setProperty(AuthenticationProviderOpenID.ALLOWED_AUDIENCES, basicProviderAudience);
+        props.setProperty(AuthenticationProviderOpenID.ROLE_CLAIM, "roles");
+        ServiceConfiguration config = new ServiceConfiguration();
+        config.setProperties(props);
+        provider.initialize(config);
+
+        // Build an empty JWT
+        DefaultJwtBuilder defaultJwtBuilder = new DefaultJwtBuilder();
+        HashMap<String, List<String>> claims = new HashMap();
+        claims.put("roles", Collections.singletonList("my-role"));
+        defaultJwtBuilder.setClaims(claims);
+        defaultJwtBuilder.setAudience(basicProviderAudience);
+        DecodedJWT jwt = JWT.decode(defaultJwtBuilder.compact());
+
+        Assert.assertEquals("my-role", provider.getRole(jwt));
+    }
+
+    @Test void ensureRoleClaimForMultiEntryListReturnsFirstRole() throws Exception {
+        AuthenticationProviderOpenID provider = new AuthenticationProviderOpenID();
+        Properties props = new Properties();
+        props.setProperty(AuthenticationProviderOpenID.ALLOWED_TOKEN_ISSUERS, "https://myissuer.com");
+        props.setProperty(AuthenticationProviderOpenID.ALLOWED_AUDIENCES, basicProviderAudience);
+        props.setProperty(AuthenticationProviderOpenID.ROLE_CLAIM, "roles");
+        ServiceConfiguration config = new ServiceConfiguration();
+        config.setProperties(props);
+        provider.initialize(config);
+
+        // Build an empty JWT
+        DefaultJwtBuilder defaultJwtBuilder = new DefaultJwtBuilder();
+        HashMap<String, List<String>> claims = new HashMap<>();
+        claims.put("roles", Arrays.asList("my-role-1", "my-role-2"));
+        defaultJwtBuilder.setClaims(claims);
+        defaultJwtBuilder.setAudience(basicProviderAudience);
+        DecodedJWT jwt = JWT.decode(defaultJwtBuilder.compact());
+
+        Assert.assertEquals("my-role-1", provider.getRole(jwt));
+    }
+
+    @Test void ensureRoleClaimForEmptyListReturnsNull() throws Exception {
+        AuthenticationProviderOpenID provider = new AuthenticationProviderOpenID();
+        Properties props = new Properties();
+        props.setProperty(AuthenticationProviderOpenID.ALLOWED_TOKEN_ISSUERS, "https://myissuer.com");
+        props.setProperty(AuthenticationProviderOpenID.ALLOWED_AUDIENCES, "no-role-audience-test");
+        props.setProperty(AuthenticationProviderOpenID.ROLE_CLAIM, "roles");
+        ServiceConfiguration config = new ServiceConfiguration();
+        config.setProperties(props);
+        provider.initialize(config);
+
+        // Build an empty JWT
+        DefaultJwtBuilder defaultJwtBuilder = new DefaultJwtBuilder();
+        HashMap<String, List<String>> claims = new HashMap<>();
+        claims.put("roles", Collections.emptyList());
+        defaultJwtBuilder.setClaims(claims);
+        defaultJwtBuilder.setAudience("no-role-audience-test");
+        DecodedJWT jwt = JWT.decode(defaultJwtBuilder.compact());
+
+        // A JWT with an empty list role claim must result in a null role
+        assertNull(provider.getRole(jwt));
+    }
+
+    // Method simplifies adding the required claims. For the tests that need to verify invalid values for these
+    // claims, it is sufficient to set the values after calling this method.
+    private void addValidMandatoryClaims(DefaultJwtBuilder defaultJwtBuilder, String audience) {
+        defaultJwtBuilder.setExpiration(Date.from(Instant.now().plusSeconds(60)));
+        defaultJwtBuilder.setNotBefore(Date.from(Instant.now()));
+        defaultJwtBuilder.setIssuedAt(Date.from(Instant.now()));
+        defaultJwtBuilder.setAudience(audience);
+        defaultJwtBuilder.setSubject("my-role");
+    }
+}
diff --git a/pulsar-broker-auth-oidc/src/test/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationStateOpenIDTest.java b/pulsar-broker-auth-oidc/src/test/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationStateOpenIDTest.java
new file mode 100644
index 00000000000..c0945ae43c3
--- /dev/null
+++ b/pulsar-broker-auth-oidc/src/test/java/org/apache/pulsar/broker/authentication/oidc/AuthenticationStateOpenIDTest.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.authentication.oidc;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertThrows;
+import static org.testng.AssertJUnit.fail;
+import javax.naming.AuthenticationException;
+import org.testng.annotations.Test;
+
+/**
+ * Unit tests primarily focused on failure scenarios for {@link AuthenticationStateOpenID}. The happy path is covered
+ * by {@link AuthenticationProviderOpenIDIntegrationTest}.
+ */
+public class AuthenticationStateOpenIDTest {
+    @Test
+    void getRoleOnAuthStateShouldFailIfNotAuthenticated() {
+        AuthenticationStateOpenID state = new AuthenticationStateOpenID(null, null, null);
+        assertFalse(state.isComplete());
+        assertThrows(AuthenticationException.class, state::getAuthRole);
+    }
+
+    @Test
+    void getAuthDataOnAuthStateShouldBeNullIfNotAuthenticated() {
+        AuthenticationStateOpenID state = new AuthenticationStateOpenID(null, null, null);
+        assertNull(state.getAuthDataSource());
+    }
+
+    // We override this behavior to make it clear that this provider is only meant to be used asynchronously.
+    @SuppressWarnings("deprecation")
+    @Test
+    void authenticateShouldThrowNotImplementedException() {
+        AuthenticationStateOpenID state = new AuthenticationStateOpenID(null, null, null);
+        try {
+            state.authenticate(null);
+            fail("Expected AuthenticationException to be thrown");
+        } catch (AuthenticationException e) {
+            assertEquals(e.getMessage(), "Not supported");
+        }
+    }
+}
diff --git a/pulsar-broker-auth-oidc/src/test/java/org/apache/pulsar/broker/authentication/oidc/ConfigUtilsTest.java b/pulsar-broker-auth-oidc/src/test/java/org/apache/pulsar/broker/authentication/oidc/ConfigUtilsTest.java
new file mode 100644
index 00000000000..ad06d4b0c10
--- /dev/null
+++ b/pulsar-broker-auth-oidc/src/test/java/org/apache/pulsar/broker/authentication/oidc/ConfigUtilsTest.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.broker.authentication.oidc;
+
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertFalse;
+import static org.testng.Assert.assertNull;
+import static org.testng.Assert.assertTrue;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.Set;
+import org.apache.pulsar.broker.ServiceConfiguration;
+import org.testng.annotations.Test;
+
+public class ConfigUtilsTest {
+
+    @Test
+    public void testGetConfigValueAsStringWorks() {
+        Properties props = new Properties();
+        props.setProperty("prop1", "audience");
+        ServiceConfiguration config = new ServiceConfiguration();
+        config.setProperties(props);
+        String actual = ConfigUtils.getConfigValueAsString(config, "prop1");
+        assertEquals("audience", actual);
+    }
+
+    @Test
+    public void testGetConfigValueAsStringReturnsNullIfMissing() {
+        Properties props = new Properties();
+        ServiceConfiguration config = new ServiceConfiguration();
+        config.setProperties(props);
+        String actual = ConfigUtils.getConfigValueAsString(config, "prop1");
+        assertNull(actual);
+    }
+
+    @Test
+    public void testGetConfigValueAsStringWithDefaultWorks() {
+        Properties props = new Properties();
+        props.setProperty("prop1", "audience");
+        ServiceConfiguration config = new ServiceConfiguration();
+        config.setProperties(props);
+        String actual = ConfigUtils.getConfigValueAsString(config, "prop1", "default");
+        assertEquals("audience", actual);
+    }
+
+    @Test
+    public void testGetConfigValueAsStringReturnsDefaultIfMissing() {
+        Properties props = new Properties();
+        ServiceConfiguration config = new ServiceConfiguration();
+        config.setProperties(props);
+        String actual = ConfigUtils.getConfigValueAsString(config, "prop1", "default");
+        assertEquals("default", actual);
+    }
+
+    @Test
+    public void testGetConfigValueAsSetReturnsWorks() {
+        Properties props = new Properties();
+        props.setProperty("prop1", "a, b,   c");
+        ServiceConfiguration config = new ServiceConfiguration();
+        config.setProperties(props);
+        Set<String> actual = ConfigUtils.getConfigValueAsSet(config, "prop1");
+        // Trims all whitespace
+        assertEquals(Set.of("a", "b", "c"), actual);
+    }
+
+    @Test
+    public void testGetConfigValueAsSetReturnsEmptySetIfMissing() {
+        Properties props = new Properties();
+        ServiceConfiguration config = new ServiceConfiguration();
+        config.setProperties(props);
+        Set<String> actual = ConfigUtils.getConfigValueAsSet(config, "prop1");
+        assertEquals(Collections.emptySet(), actual);
+    }
+
+    @Test
+    public void testGetConfigValueAsSetReturnsEmptySetIfBlankString() {
+        Properties props = new Properties();
+        props.setProperty("prop1", " ");
+        ServiceConfiguration config = new ServiceConfiguration();
+        config.setProperties(props);
+        Set<String> actual = ConfigUtils.getConfigValueAsSet(config, "prop1");
+        assertEquals(Collections.emptySet(), actual);
+    }
+
+    @Test
+    public void testGetConfigValueAsIntegerWorks() {
+        Properties props = new Properties();
+        props.setProperty("prop1", "1234");
+        ServiceConfiguration config = new ServiceConfiguration();
+        config.setProperties(props);
+        int actual = ConfigUtils.getConfigValueAsInt(config, "prop1", 9);
+        assertEquals(1234, actual);
+    }
+
+    @Test
+    public void testGetConfigValueAsIntegerReturnsDefaultIfNAN() {
+        Properties props = new Properties();
+        props.setProperty("prop1", "non-a-number");
+        ServiceConfiguration config = new ServiceConfiguration();
+        config.setProperties(props);
+        int actual = ConfigUtils.getConfigValueAsInt(config, "prop1", 9);
+        assertEquals(9, actual);
+    }
+
+    @Test
+    public void testGetConfigValueAsIntegerReturnsDefaultIfMissingProp() {
+        Properties props = new Properties();
+        ServiceConfiguration config = new ServiceConfiguration();
+        config.setProperties(props);
+        int actual = ConfigUtils.getConfigValueAsInt(config, "prop1", 10);
+        assertEquals(10, actual);
+    }
+
+    @Test
+    public void testGetConfigValueAsBooleanReturnsDefaultIfMissingProp() {
+        Properties props = new Properties();
+        ServiceConfiguration config = new ServiceConfiguration();
+        config.setProperties(props);
+        boolean actualFalse = ConfigUtils.getConfigValueAsBoolean(config, "prop1", false);
+        assertFalse(actualFalse);
+        boolean actualTrue = ConfigUtils.getConfigValueAsBoolean(config, "prop1", true);
+        assertTrue(actualTrue);
+    }
+
+    @Test
+    public void testGetConfigValueAsBooleanWorks() {
+        Properties props = new Properties();
+        props.setProperty("prop1", "true");
+        ServiceConfiguration config = new ServiceConfiguration();
+        config.setProperties(props);
+        boolean actual = ConfigUtils.getConfigValueAsBoolean(config, "prop1", false);
+        assertTrue(actual);
+    }
+
+}
diff --git a/pulsar-broker-auth-oidc/src/test/java/resources/fakeKubeConfig.yaml b/pulsar-broker-auth-oidc/src/test/java/resources/fakeKubeConfig.yaml
new file mode 100644
index 00000000000..ef3d373399d
--- /dev/null
+++ b/pulsar-broker-auth-oidc/src/test/java/resources/fakeKubeConfig.yaml
@@ -0,0 +1,37 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+
+current-context: wire-mock-server
+apiVersion: v1
+clusters:
+- cluster:
+    api-version: v1
+    server: http://localhost:${WIRE_MOCK_PORT}/k8s
+  name: wire-mock-server
+contexts:
+- context:
+    cluster: wire-mock-server
+    namespace: pulsar
+    user: test-user
+  name: wire-mock-server
+kind: Config
+users:
+- name: test-user
+  user:
+    token: fake-token
\ No newline at end of file