You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/07/29 09:39:05 UTC

[pulsar] 09/14: [client authentication] add authentication client with oauth2 support (#7420)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit c86fe38266eb3181a9942993cecff65fdee1a511
Author: Jia Zhai <zh...@apache.org>
AuthorDate: Thu Jul 2 01:41:31 2020 -0500

    [client authentication] add authentication client with oauth2 support (#7420)
    
    ### Motivation
    
    Pulsar supports authenticating clients using OAuth 2.0 access tokens. You can use tokens to identify a Pulsar client and associate with some "principal" (or "role") that is permitted to do some actions (eg: publish to a topic or consume from a topic).
    
    This module is to support Pulsar Client Authentication Plugin for OAuth 2.0 directly. Client side communicate with Oauth 2.0 server,  then the client will get an `access token` from Oauth 2.0 server, and will pass this `access token` to Pulsar broker to do the authentication.
    
    So the Broker side could still use `org.apache.pulsar.broker.authentication.AuthenticationProviderToken`,
    also user can add their own `AuthenticationProvider` to work with this module.
    
    ### Modifications
    
    - add related code;
    - add related test;
    - add related doc.
    
    The init of this client authentication module would be like:
    ```java
    Authentication oauth2Authentication = AuthenticationFactoryOAuth2.clientCredentials(
                    new URL("https://dev-kt-aa9ne.us.auth0.com/oauth/token"),
                    new URL("file:///path/to/credential/file.json"),  // key file path
                    "https://dev-kt-aa9ne.us.auth0.com/api/v2/"
            );
    PulsarClient client = PulsarClient.builder()
        .serviceUrl("pulsar://broker.example.com:6650/")
        .authentication(oauth2Authentication)
        .build();
    ```
    
    ### Verifying this change
    
    tests passed.
    
    (cherry picked from commit 768813ea595697e317a0ac5af53191e6cf832766)
---
 .../TokenAuthenticatedProducerConsumerTest.java    | 143 ++++++++++++++++++
 ...kenOauth2AuthenticatedProducerConsumerTest.java | 160 +++++++++++++++++++++
 .../authentication/token/credentials_file.json     |   4 +
 .../impl/auth/oauth2/AuthenticationDataOAuth2.java |  60 ++++++++
 .../auth/oauth2/AuthenticationFactoryOAuth2.java   |  47 ++++++
 .../impl/auth/oauth2/AuthenticationOAuth2.java     | 131 +++++++++++++++++
 .../impl/auth/oauth2/ClientCredentialsFlow.java    | 146 +++++++++++++++++++
 .../pulsar/client/impl/auth/oauth2/Flow.java       |  47 ++++++
 .../pulsar/client/impl/auth/oauth2/FlowBase.java   |  80 +++++++++++
 .../pulsar/client/impl/auth/oauth2/KeyFile.java    |  66 +++++++++
 .../pulsar/client/impl/auth/oauth2/Readme.md       |  94 ++++++++++++
 .../client/impl/auth/oauth2/package-info.java      |  19 +++
 .../protocol/ClientCredentialsExchangeRequest.java |  42 ++++++
 .../protocol/ClientCredentialsExchanger.java       |  41 ++++++
 .../oauth2/protocol/DefaultMetadataResolver.java   | 105 ++++++++++++++
 .../client/impl/auth/oauth2/protocol/Metadata.java |  54 +++++++
 .../auth/oauth2/protocol/MetadataResolver.java     |  28 ++++
 .../impl/auth/oauth2/protocol/TokenClient.java     | 121 ++++++++++++++++
 .../impl/auth/oauth2/protocol/TokenError.java      |  41 ++++++
 .../oauth2/protocol/TokenExchangeException.java    |  35 +++++
 .../impl/auth/oauth2/protocol/TokenResult.java     |  51 +++++++
 .../impl/auth/oauth2/protocol/package-info.java    |  19 +++
 .../impl/auth/oauth2/AuthenticationOAuth2Test.java | 122 ++++++++++++++++
 .../pulsar/client/impl/auth/oauth2/MockClock.java  |  97 +++++++++++++
 24 files changed, 1753 insertions(+)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java
new file mode 100644
index 0000000..874a34d
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import static org.mockito.Mockito.spy;
+
+import com.google.common.collect.Sets;
+import java.net.URI;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+/**
+ * Test Token authentication with:
+ *    client: org.apache.pulsar.client.impl.auth.AuthenticationToken
+ *    broker: org.apache.pulsar.broker.authentication.AuthenticationProviderToken
+ */
+public class TokenAuthenticatedProducerConsumerTest extends ProducerConsumerBase {
+    private static final Logger log = LoggerFactory.getLogger(TokenAuthenticatedProducerConsumerTest.class);
+
+    // pre-create a public/private_key pair.  Public key used for broker to verify client passed in token
+    private final String TOKEN_PUBLIC_KEY = "data:;base64,MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAhHKgdY6arG7eE75bUPtznN5WjMu0sxLq7pI5Aaiw2Ijerbz33iO/Fdd2yJVuAZNDZPD/AVSaeliEh/BP+s2rN8KNuiywD+SlL1NGf2JDS5BvGT4Q8eHfDDRd/iY5zkK58wYwlke6C8fKCx10MTH9iYAJpzaaxs+Tu1RaatK+691aYSiMkYIfgbqAKmSCpK+48al/PkmENfuhzaTBPhCnEblhNvUhS5MjzBcAcGzecpEuVSxUzDtm8rU8DEQR6kkdXS1QnGHVNis/vgk8QzctkJKbtgDIaGzNUmDvTCyPZ8WLWSWJWb1oPxRZwpfXVP69ijU0Rme4/YkuHt6IEw6ANQIDAQAB";
+    // admin token created based on private_key.
+    private final String ADMIN_TOKEN = "eyJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImV4cCI6MTYyNTEzNjQyMn0.DAfbUPZwQURgGvor4scO0NoqoyHkCulKZkhP7kksCWFvgx6B22iKuXGX42EFlFSRMWYYgIJXV7UZATCLCjJpn_ijrO6AWBmooib3f94OPoLDdkF3qXnqaLnvJtl8_sCoLCSghR_O3hQFgQW2GRjMDdfJgl2_HXCWuzedtI5cQJdbpfU0NU10nzo7RtrpCmUdgQYQEHegYOawLqQVvr53ZGjrZilBXY9HHz1mSlnwZGNGVNNdvRthBuGtXtfKgtfSDF5jLqABvK8TUpdNJybibeiOspdzuY19-wVt4eVXzNAGsP4V4Zs91MgIUYV5lWKnBUuVWalppkMWhRF4Jf-KWQ";
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        conf.setAuthenticationEnabled(true);
+        conf.setAuthorizationEnabled(true);
+
+        Set<String> superUserRoles = new HashSet<>();
+        superUserRoles.add("admin");
+        conf.setSuperUserRoles(superUserRoles);
+
+        Set<String> providers = new HashSet<>();
+        providers.add(AuthenticationProviderToken.class.getName());
+        conf.setAuthenticationProviders(providers);
+
+        conf.setClusterName("test");
+
+        // Set provider domain name
+        Properties properties = new Properties();
+        properties.setProperty("tokenPublicKey", TOKEN_PUBLIC_KEY);
+
+        conf.setProperties(properties);
+        super.init();
+    }
+
+    // setup both admin and pulsar client
+    protected final void clientSetup() throws Exception {
+        admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString())
+                .authentication(AuthenticationFactory.token(ADMIN_TOKEN))
+                .build());
+
+        pulsarClient = PulsarClient.builder().serviceUrl(new URI(pulsar.getBrokerServiceUrl()).toString())
+                .statsInterval(0, TimeUnit.SECONDS)
+                .authentication(AuthenticationFactory.token(ADMIN_TOKEN))
+                .build();
+    }
+
+    @AfterMethod
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @DataProvider(name = "batch")
+    public Object[][] codecProvider() {
+        return new Object[][] { { 0 }, { 1000 } };
+    }
+
+    public void testSyncProducerAndConsumer() throws Exception {
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/my-topic")
+                .subscriptionName("my-subscriber-name").subscribe();
+
+        ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic");
+
+        Producer<byte[]> producer = producerBuilder.create();
+        for (int i = 0; i < 10; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+
+        Message<byte[]> msg = null;
+        Set<String> messageSet = Sets.newHashSet();
+        for (int i = 0; i < 10; i++) {
+            msg = consumer.receive(5, TimeUnit.SECONDS);
+            String receivedMessage = new String(msg.getData());
+            log.debug("Received message: [{}]", receivedMessage);
+            String expectedMessage = "my-message-" + i;
+            testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
+        }
+        // Acknowledge the consumption of all messages at once
+        consumer.acknowledgeCumulative(msg);
+        consumer.close();
+    }
+
+    @Test
+    public void testTokenProducerAndConsumer() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+        clientSetup();
+
+        // test rest by admin
+        admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString()));
+        admin.tenants().createTenant("my-property",
+                new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test")));
+        admin.namespaces().createNamespace("my-property/my-ns", Sets.newHashSet("test"));
+
+        // test protocol by producer/consumer
+        testSyncProducerAndConsumer();
+
+        log.info("-- Exiting {} test --", methodName);
+    }
+
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java
new file mode 100644
index 0000000..b54d8c9
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import static org.mockito.Mockito.spy;
+
+import com.google.common.collect.Sets;
+import java.net.URI;
+import java.net.URL;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationFactoryOAuth2;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+/**
+ * Test Token authentication with:
+ *    client: org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
+ *    broker: org.apache.pulsar.broker.authentication.AuthenticationProviderToken
+ */
+public class TokenOauth2AuthenticatedProducerConsumerTest extends ProducerConsumerBase {
+    private static final Logger log = LoggerFactory.getLogger(TokenOauth2AuthenticatedProducerConsumerTest.class);
+
+    // public key in oauth2 server to verify the client passed in token. get from https://jwt.io/
+    private final String TOKEN_TEST_PUBLIC_KEY = "data:;base64,MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA2tZd/4gJda3U2Pc3tpgRAN7JPGWx/Gn17v/0IiZlNNRbP/Mmf0Vc6G1qsnaRaWNWOR+t6/a6ekFHJMikQ1N2X6yfz4UjMc8/G2FDPRmWjA+GURzARjVhxc/BBEYGoD0Kwvbq/u9CZm2QjlKrYaLfg3AeB09j0btNrDJ8rBsNzU6AuzChRvXj9IdcE/A/4N/UQ+S9cJ4UXP6NJbToLwajQ5km+CnxdGE6nfB7LWHvOFHjn9C2Rb9e37CFlmeKmIVFkagFM0gbmGOb6bnGI8Bp/VNGV0APef4YaBvBTqwoZ1Z4aDHy5eRxXfAMdtBkBupmBXqL6bpd15XRYUbu/7ck9QIDAQAB";
+
+    private final String ADMIN_ROLE = "Xd23RHsUnvUlP7wchjNYOaIfazgeHd9x@clients";
+
+    // Credentials File, which contains "client_id" and "client_secret"
+    private final String CREDENTIALS_FILE = "./src/test/resources/authentication/token/credentials_file.json";
+
+    @BeforeMethod
+    @Override
+    protected void setup() throws Exception {
+        conf.setAuthenticationEnabled(true);
+        conf.setAuthorizationEnabled(true);
+
+        Set<String> superUserRoles = new HashSet<>();
+        superUserRoles.add(ADMIN_ROLE);
+        conf.setSuperUserRoles(superUserRoles);
+
+        Set<String> providers = new HashSet<>();
+        providers.add(AuthenticationProviderToken.class.getName());
+        conf.setAuthenticationProviders(providers);
+
+        conf.setClusterName("test");
+
+        // Set provider domain name
+        Properties properties = new Properties();
+        properties.setProperty("tokenPublicKey", TOKEN_TEST_PUBLIC_KEY);
+
+        conf.setProperties(properties);
+        super.init();
+    }
+
+    // setup both admin and pulsar client
+    protected final void clientSetup() throws Exception {
+        Path path = Paths.get(CREDENTIALS_FILE).toAbsolutePath();
+        log.info("Credentials File path: {}", path.toString());
+
+        // AuthenticationOAuth2
+        Authentication authentication = AuthenticationFactoryOAuth2.clientCredentials(
+                new URL("https://dev-kt-aa9ne.us.auth0.com/oauth/token"),
+                new URL("file://" + path.toString()),  // key file path
+                "https://dev-kt-aa9ne.us.auth0.com/api/v2/"
+        );
+
+        admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString())
+                .authentication(authentication)
+                .build());
+
+        pulsarClient = PulsarClient.builder().serviceUrl(new URI(pulsar.getBrokerServiceUrl()).toString())
+                .statsInterval(0, TimeUnit.SECONDS)
+                .authentication(authentication)
+                .build();
+    }
+
+    @AfterMethod
+    @Override
+    protected void cleanup() throws Exception {
+        super.internalCleanup();
+    }
+
+    @DataProvider(name = "batch")
+    public Object[][] codecProvider() {
+        return new Object[][] { { 0 }, { 1000 } };
+    }
+
+    public void testSyncProducerAndConsumer() throws Exception {
+        Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/my-topic")
+                .subscriptionName("my-subscriber-name").subscribe();
+
+        ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic");
+
+        Producer<byte[]> producer = producerBuilder.create();
+        for (int i = 0; i < 10; i++) {
+            String message = "my-message-" + i;
+            producer.send(message.getBytes());
+        }
+
+        Message<byte[]> msg = null;
+        Set<String> messageSet = Sets.newHashSet();
+        for (int i = 0; i < 10; i++) {
+            msg = consumer.receive(5, TimeUnit.SECONDS);
+            String receivedMessage = new String(msg.getData());
+            log.debug("Received message: [{}]", receivedMessage);
+            String expectedMessage = "my-message-" + i;
+            testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
+        }
+        // Acknowledge the consumption of all messages at once
+        consumer.acknowledgeCumulative(msg);
+        consumer.close();
+    }
+
+    @Test
+    public void testTokenProducerAndConsumer() throws Exception {
+        log.info("-- Starting {} test --", methodName);
+        clientSetup();
+
+        // test rest by admin
+        admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString()));
+        admin.tenants().createTenant("my-property",
+                new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test")));
+        admin.namespaces().createNamespace("my-property/my-ns", Sets.newHashSet("test"));
+
+        // test protocol by producer/consumer
+        testSyncProducerAndConsumer();
+
+        log.info("-- Exiting {} test --", methodName);
+    }
+
+}
diff --git a/pulsar-broker/src/test/resources/authentication/token/credentials_file.json b/pulsar-broker/src/test/resources/authentication/token/credentials_file.json
new file mode 100644
index 0000000..db1eccd
--- /dev/null
+++ b/pulsar-broker/src/test/resources/authentication/token/credentials_file.json
@@ -0,0 +1,4 @@
+{
+  "client_id":"Xd23RHsUnvUlP7wchjNYOaIfazgeHd9x",
+  "client_secret":"rT7ps7WY8uhdVuBTKWZkttwLdQotmdEliaM5rLfmgNibvqziZ-g07ZH52N_poGAb"
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationDataOAuth2.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationDataOAuth2.java
new file mode 100644
index 0000000..59810f5
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationDataOAuth2.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pulsar.client.api.AuthenticationDataProvider;
+
+/**
+ * Provide OAuth 2.0 authentication data.
+ */
+class AuthenticationDataOAuth2 implements AuthenticationDataProvider {
+    public static final String HTTP_HEADER_NAME = "Authorization";
+
+    private final String accessToken;
+    private final Set<Map.Entry<String, String>> headers;
+
+    public AuthenticationDataOAuth2(String accessToken) {
+        this.accessToken = accessToken;
+        this.headers = Collections.singletonMap(HTTP_HEADER_NAME, "Bearer " + accessToken).entrySet();
+    }
+
+    @Override
+    public boolean hasDataForHttp() {
+        return true;
+    }
+
+    @Override
+    public Set<Map.Entry<String, String>> getHttpHeaders() {
+        return this.headers;
+    }
+
+    @Override
+    public boolean hasDataFromCommand() {
+        return true;
+    }
+
+    @Override
+    public String getCommandData() {
+        return this.accessToken;
+    }
+
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java
new file mode 100644
index 0000000..54da5287d
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2;
+
+import java.net.URL;
+import java.time.Clock;
+import org.apache.pulsar.client.api.Authentication;
+
+/**
+ * Factory class that allows to create {@link Authentication} instances
+ * for OAuth 2.0 authentication methods.
+ */
+public final class AuthenticationFactoryOAuth2 {
+
+    /**
+     * Authenticate with client credentials.
+     *
+     * @param issuerUrl the issuer URL
+     * @param credentialsUrl the credentials URL
+     * @param audience the audience identifier
+     * @return an Authentication object
+     */
+    public static Authentication clientCredentials(URL issuerUrl, URL credentialsUrl, String audience) {
+        ClientCredentialsFlow flow = ClientCredentialsFlow.builder()
+                .issuerUrl(issuerUrl)
+                .privateKey(credentialsUrl.toExternalForm())
+                .audience(audience)
+                .build();
+        return new AuthenticationOAuth2(flow, Clock.systemDefaultZone());
+    }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java
new file mode 100644
index 0000000..f7f41d0
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2;
+
+import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenResult;
+import java.io.IOException;
+import java.time.Clock;
+import java.time.Instant;
+import java.util.Map;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.client.api.EncodedAuthenticationParameterSupport;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.AuthenticationUtil;
+
+/**
+ * Pulsar client authentication provider based on OAuth 2.0.
+ */
+@Slf4j
+public class AuthenticationOAuth2 implements Authentication, EncodedAuthenticationParameterSupport {
+
+    public static final String CONFIG_PARAM_TYPE = "type";
+    public static final String TYPE_CLIENT_CREDENTIALS = "client_credentials";
+    public static final String AUTH_METHOD_NAME = "token";
+    public static final double EXPIRY_ADJUSTMENT = 0.9;
+    private static final long serialVersionUID = 1L;
+
+    final Clock clock;
+    Flow flow;
+    transient CachedToken cachedToken;
+
+    public AuthenticationOAuth2() {
+        this.clock = Clock.systemDefaultZone();
+    }
+
+    AuthenticationOAuth2(Flow flow, Clock clock) {
+        this.flow = flow;
+        this.clock = clock;
+    }
+
+    @Override
+    public String getAuthMethodName() {
+        return AUTH_METHOD_NAME;
+    }
+
+    @Override
+    public void configure(String encodedAuthParamString) {
+        if (StringUtils.isBlank(encodedAuthParamString)) {
+            throw new IllegalArgumentException("No authentication parameters were provided");
+        }
+        Map<String, String> params;
+        try {
+            params = AuthenticationUtil.configureFromJsonString(encodedAuthParamString);
+        } catch (IOException e) {
+            throw new IllegalArgumentException("Malformed authentication parameters", e);
+        }
+
+        String type = params.getOrDefault(CONFIG_PARAM_TYPE, TYPE_CLIENT_CREDENTIALS);
+        switch(type) {
+            case TYPE_CLIENT_CREDENTIALS:
+                this.flow = ClientCredentialsFlow.fromParameters(params);
+                break;
+            default:
+                throw new IllegalArgumentException("Unsupported authentication type: " + type);
+        }
+    }
+
+    @Override
+    @Deprecated
+    public void configure(Map<String, String> authParams) {
+        throw new NotImplementedException("Deprecated; use EncodedAuthenticationParameterSupport");
+    }
+
+    @Override
+    public void start() throws PulsarClientException {
+        flow.initialize();
+    }
+
+    @Override
+    public synchronized AuthenticationDataProvider getAuthData() throws PulsarClientException {
+        if (this.cachedToken == null || this.cachedToken.isExpired()) {
+            TokenResult tr = this.flow.authenticate();
+            this.cachedToken = new CachedToken(tr);
+        }
+        return this.cachedToken.getAuthData();
+    }
+
+    @Override
+    public void close() throws IOException {
+        flow.close();
+    }
+
+    @Data
+    class CachedToken {
+        private final TokenResult latest;
+        private final Instant expiresAt;
+        private final AuthenticationDataOAuth2 authData;
+
+        public CachedToken(TokenResult latest) {
+            this.latest = latest;
+            int adjustedExpiresIn = (int) (latest.getExpiresIn() * EXPIRY_ADJUSTMENT);
+            this.expiresAt = AuthenticationOAuth2.this.clock.instant().plusSeconds(adjustedExpiresIn);
+            this.authData = new AuthenticationDataOAuth2(latest.getAccessToken());
+        }
+
+        public boolean isExpired() {
+            return AuthenticationOAuth2.this.clock.instant().isAfter(this.expiresAt);
+        }
+    }
+}
+
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
new file mode 100644
index 0000000..13bf0f5
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2;
+
+import org.apache.pulsar.client.impl.auth.oauth2.protocol.ClientCredentialsExchangeRequest;
+import org.apache.pulsar.client.impl.auth.oauth2.protocol.ClientCredentialsExchanger;
+import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenClient;
+import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenExchangeException;
+import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenResult;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.net.URLConnection;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import lombok.Builder;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.http.entity.ContentType;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+/**
+ * Implementation of OAuth 2.0 Client Credentials flow.
+ *
+ * @see <a href="https://tools.ietf.org/html/rfc6749#section-4.4">OAuth 2.0 RFC 6749, section 4.4</a>
+ */
+@Slf4j
+class ClientCredentialsFlow extends FlowBase {
+    public static final String CONFIG_PARAM_ISSUER_URL = "issuerUrl";
+    public static final String CONFIG_PARAM_AUDIENCE = "audience";
+    public static final String CONFIG_PARAM_KEY_FILE = "privateKey";
+
+    private static final long serialVersionUID = 1L;
+
+    private final String audience;
+    private final String privateKey;
+
+    private transient ClientCredentialsExchanger exchanger;
+
+    @Builder
+    public ClientCredentialsFlow(URL issuerUrl, String audience, String privateKey) {
+        super(issuerUrl);
+        this.audience = audience;
+        this.privateKey = privateKey;
+    }
+
+    @Override
+    public void initialize() throws PulsarClientException {
+        super.initialize();
+        assert this.metadata != null;
+
+        URL tokenUrl = this.metadata.getTokenEndpoint();
+        this.exchanger = new TokenClient(tokenUrl);
+    }
+
+    public TokenResult authenticate() throws PulsarClientException {
+        // read the private key from storage
+        KeyFile keyFile;
+        try {
+            keyFile = loadPrivateKey(this.privateKey);
+        } catch (IOException e) {
+            throw new PulsarClientException.AuthenticationException("Unable to read private key: " + e.getMessage());
+        }
+
+        // request an access token using client credentials
+        ClientCredentialsExchangeRequest req = ClientCredentialsExchangeRequest.builder()
+                .clientId(keyFile.getClientId())
+                .clientSecret(keyFile.getClientSecret())
+                .audience(this.audience)
+                .build();
+        TokenResult tr;
+        try {
+            tr = this.exchanger.exchangeClientCredentials(req);
+        } catch (TokenExchangeException | IOException e) {
+            throw new PulsarClientException.AuthenticationException("Unable to obtain an access token: "
+                                                                    + e.getMessage());
+        }
+
+        return tr;
+    }
+
+    @Override
+    public void close() {
+        exchanger.close();
+    }
+
+    /**
+     * Constructs a {@link ClientCredentialsFlow} from configuration parameters.
+     * @param params
+     * @return
+     */
+    public static ClientCredentialsFlow fromParameters(Map<String, String> params) {
+        URL issuerUrl = parseParameterUrl(params, CONFIG_PARAM_ISSUER_URL);
+        String audience = parseParameterString(params, CONFIG_PARAM_AUDIENCE);
+        String privateKeyUrl = parseParameterString(params, CONFIG_PARAM_KEY_FILE);
+        return ClientCredentialsFlow.builder()
+                .issuerUrl(issuerUrl)
+                .audience(audience)
+                .privateKey(privateKeyUrl)
+                .build();
+    }
+
+    /**
+     * Loads the private key from the given URL.
+     * @param privateKeyURL
+     * @return
+     * @throws IOException
+     */
+    private static KeyFile loadPrivateKey(String privateKeyURL) throws IOException {
+        try {
+            URLConnection urlConnection = new org.apache.pulsar.client.api.url.URL(privateKeyURL).openConnection();
+
+            String protocol = urlConnection.getURL().getProtocol();
+            String contentType = urlConnection.getContentType();
+            if ("data".equals(protocol) && !ContentType.APPLICATION_JSON.getMimeType().equals(contentType)) {
+                throw new IllegalArgumentException(
+                        "Unsupported media type or encoding format: " + urlConnection.getContentType());
+            }
+            KeyFile privateKey;
+            try (Reader r = new InputStreamReader((InputStream) urlConnection.getContent(), StandardCharsets.UTF_8)) {
+                privateKey = KeyFile.fromJson(r);
+            }
+            return privateKey;
+        } catch (URISyntaxException | InstantiationException | IllegalAccessException e) {
+            throw new IOException("Invalid privateKey format", e);
+        }
+    }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/Flow.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/Flow.java
new file mode 100644
index 0000000..b572325
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/Flow.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2;
+
+import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenResult;
+import java.io.Serializable;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+/**
+ * An OAuth 2.0 authorization flow.
+ */
+interface Flow extends Serializable {
+
+    /**
+     * Initializes the authorization flow.
+     * @throws PulsarClientException if the flow could not be initialized.
+     */
+    void initialize() throws PulsarClientException;
+
+    /**
+     * Acquires an access token from the OAuth 2.0 authorization server.
+     * @return a token result including an access token and optionally a refresh token.
+     * @throws PulsarClientException if authentication failed.
+     */
+    TokenResult authenticate() throws PulsarClientException;
+
+    /**
+     * Closes the authorization flow.
+     */
+    void close();
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java
new file mode 100644
index 0000000..0f47121
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2;
+
+import org.apache.pulsar.client.impl.auth.oauth2.protocol.DefaultMetadataResolver;
+import org.apache.pulsar.client.impl.auth.oauth2.protocol.Metadata;
+import org.apache.pulsar.client.impl.auth.oauth2.protocol.MetadataResolver;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+/**
+ * An abstract OAuth 2.0 authorization flow.
+ */
+@Slf4j
+abstract class FlowBase implements Flow {
+
+    private static final long serialVersionUID = 1L;
+
+    protected final URL issuerUrl;
+
+    protected transient Metadata metadata;
+
+    protected FlowBase(URL issuerUrl) {
+        this.issuerUrl = issuerUrl;
+    }
+
+    public void initialize() throws PulsarClientException {
+        try {
+            this.metadata = createMetadataResolver().resolve();
+        } catch (IOException e) {
+            log.error("Unable to retrieve OAuth 2.0 server metadata", e);
+            throw new PulsarClientException.AuthenticationException("Unable to retrieve OAuth 2.0 server metadata");
+        }
+    }
+
+    protected MetadataResolver createMetadataResolver() {
+        return DefaultMetadataResolver.fromIssuerUrl(issuerUrl);
+    }
+
+    static String parseParameterString(Map<String, String> params, String name) {
+        String s = params.get(name);
+        if (StringUtils.isEmpty(s)) {
+            throw new IllegalArgumentException("Required configuration parameter: " + name);
+        }
+        return s;
+    }
+
+    static URL parseParameterUrl(Map<String, String> params, String name) {
+        String s = params.get(name);
+        if (StringUtils.isEmpty(s)) {
+            throw new IllegalArgumentException("Required configuration parameter: " + name);
+        }
+        try {
+            return new URL(s);
+        } catch (MalformedURLException e) {
+            throw new IllegalArgumentException("Malformed configuration parameter: " + name);
+        }
+    }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/KeyFile.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/KeyFile.java
new file mode 100644
index 0000000..b4a6510
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/KeyFile.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.io.Reader;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+
+/**
+ * A JSON object representing a credentials file.
+ */
+@Data
+@NoArgsConstructor
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class KeyFile {
+
+    private static ObjectMapper objectMapper = new ObjectMapper();
+
+    @JsonProperty("type")
+    private String type;
+
+    @JsonProperty("client_id")
+    private String clientId;
+
+    @JsonProperty("client_secret")
+    private String clientSecret;
+
+    @JsonProperty("client_email")
+    private String clientEmail;
+
+    @JsonProperty("issuer_url")
+    private String issuerUrl;
+
+    public String toJson() throws IOException {
+        return objectMapper.writeValueAsString(this);
+    }
+
+    public static KeyFile fromJson(String value) throws IOException {
+        return objectMapper.readValue(value, KeyFile.class);
+    }
+
+    public static KeyFile fromJson(Reader value) throws IOException {
+        return objectMapper.readValue(value, KeyFile.class);
+    }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/Readme.md b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/Readme.md
new file mode 100644
index 0000000..cca7973
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/Readme.md
@@ -0,0 +1,94 @@
+# Pulsar Client Authentication Plugin for OAuth 2.0
+
+Pulsar supports authenticating clients using OAuth 2.0 access tokens.
+
+You can use tokens to identify a Pulsar client and associate with some "principal" (or "role") that is permitted
+to do some actions (eg: publish to a topic or consume from a topic). 
+
+This module is to support Pulsar Client Authentication Plugin for OAuth 2.0. And after communicate with Oauth 2.0 server, 
+client will get an `access token` from Oauth 2.0 server, and will pass this `access token` to Pulsar broker to do the authentication.
+So the Broker side could still use `org.apache.pulsar.broker.authentication.AuthenticationProviderToken`,
+also user can add their own `AuthenticationProvider` to work with this module.
+
+## Provider Configuration
+This library allows you to authenticate using an access token that is obtained from an OAuth 2.0 authorization service,
+which acts as a _token issuer_.
+
+### Authentication Types
+The authentication type determines how to obtain an access token via an OAuth 2.0 authorization flow.
+
+#### Client Credentials
+The following parameters are supported:
+
+| Parameter  | Description  | Example |
+|---|---|---|
+| `type` | Oauth 2.0 auth type. Optional. | default: `client_credentials`  |
+| `issuerUrl` | URL of the provider which allows Pulsar to obtain an access token. Required. | `https://accounts.google.com` |
+| `privateKey` | URL to a JSON credentials file (in JSON format; see below). Required. | See "Supported Pattern Formats" |
+| `audience`  | An OAuth 2.0 "resource server" identifier for the Pulsar cluster. Required. | `https://broker.example.com` |
+
+### Supported Pattern Formats of `privateKey`
+The `privateKey` parameter supports the following three pattern formats, and contains client Credentials:
+
+- `file:///path/to/file`
+- `file:/path/to/file`
+- `data:application/json;base64,<base64-encoded value>`
+
+The credentials file contains service account credentials for use with the Client Credentials authentication type.
+
+For example of a credentials file `credentials_file.json`:
+```json
+{
+  "type": "client_credentials",
+  "client_id": "d9ZyX97q1ef8Cr81WHVC4hFQ64vSlDK3",
+  "client_secret": "on1uJ...k6F6R",
+  "client_email": "1234567890-abcdefghijklmnopqrstuvwxyz@developer.gserviceaccount.com",
+  "issuer_url": "https://accounts.google.com"
+}
+```
+
+The default type is `client_credentials`, and for this type, fields "client_id" and "client_secret" is required.
+
+### Example for a typical original Oauth2 request mapping
+
+A typical original Oauth2 request, which used to get access token from Oauth2 server, is like this: 
+
+```bash
+curl --request POST \
+  --url https://dev-kt-aa9ne.us.auth0.com/oauth/token \
+  --header 'content-type: application/json' \
+  --data '{
+  "client_id":"Xd23RHsUnvUlP7wchjNYOaIfazgeHd9x",
+  "client_secret":"rT7ps7WY8uhdVuBTKWZkttwLdQotmdEliaM5rLfmgNibvqziZ-g07ZH52N_poGAb",
+  "audience":"https://dev-kt-aa9ne.us.auth0.com/api/v2/",
+  "grant_type":"client_credentials"}'
+```
+
+In which,
+- `issuerUrl` parameter in this plugin is mapped to `--url https://dev-kt-aa9ne.us.auth0.com/oauth/token`
+- `privateKey` file parameter in this plugin should at least contains fields `client_id` and `client_secret`.
+- `audience` parameter in this plugin is mapped to  `"audience":"https://dev-kt-aa9ne.us.auth0.com/api/v2/"`
+
+## Pulsar Client Config
+You can use the provider with the following Pulsar clients.
+
+### Java
+You can use the factory method:
+```java
+PulsarClient client = PulsarClient.builder()
+    .serviceUrl("pulsar://broker.example.com:6650/")
+    .authentication(
+        AuthenticationFactoryOAuth2.clientCredentials(this.issuerUrl, this.credentialsUrl, this.audience))
+    .build();
+```
+
+Similarly, you can use encoded parameters:
+```java
+Authentication auth = AuthenticationFactory
+    .create(AuthenticationOAuth2.class.getName(), "{"type":"client_credentials","privateKey":"...","issuerUrl":"...","audience":"..."}");
+
+PulsarClient client = PulsarClient.builder()
+    .serviceUrl("pulsar://broker.example.com:6650/")
+    .authentication(auth)
+    .build();
+```
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/package-info.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/package-info.java
new file mode 100644
index 0000000..3beeda0
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchangeRequest.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchangeRequest.java
new file mode 100644
index 0000000..7c14296
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchangeRequest.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2.protocol;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import lombok.Builder;
+import lombok.Data;
+
+/**
+ * A token request based on the exchange of client credentials.
+ *
+ * @see <a href="https://tools.ietf.org/html/rfc6749#section-4.4">OAuth 2.0 RFC 6749, section 4.4</a>
+ */
+@Data
+@Builder
+public class ClientCredentialsExchangeRequest {
+
+    @JsonProperty("client_id")
+    private String clientId;
+
+    @JsonProperty("client_secret")
+    private String clientSecret;
+
+    @JsonProperty("audience")
+    private String audience;
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchanger.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchanger.java
new file mode 100644
index 0000000..e6a956a
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchanger.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2.protocol;
+
+import java.io.IOException;
+
+/**
+ * An interface for exchanging client credentials for an access token.
+ */
+public interface ClientCredentialsExchanger {
+    /**
+     * Requests an exchange of client credentials for an access token.
+     * @param req the request details.
+     * @return an access token.
+     * @throws TokenExchangeException if the OAuth server returned a detailed error.
+     * @throws IOException if a general IO error occurred.
+     */
+    TokenResult exchangeClientCredentials(ClientCredentialsExchangeRequest req)
+            throws TokenExchangeException, IOException;
+
+    /**
+     * Closes the exchanger.
+     */
+    void close();
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/DefaultMetadataResolver.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/DefaultMetadataResolver.java
new file mode 100644
index 0000000..d16ce8b
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/DefaultMetadataResolver.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2.protocol;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLConnection;
+import java.time.Duration;
+
+/**
+ * Resolves OAuth 2.0 authorization server metadata as described in RFC 8414.
+ */
+public class DefaultMetadataResolver implements MetadataResolver {
+
+    private final URL metadataUrl;
+    private final ObjectReader objectReader;
+    private Duration connectTimeout;
+    private Duration readTimeout;
+
+    public DefaultMetadataResolver(URL metadataUrl) {
+        this.metadataUrl = metadataUrl;
+        this.objectReader = new ObjectMapper().readerFor(Metadata.class);
+    }
+
+    public DefaultMetadataResolver withConnectTimeout(Duration connectTimeout) {
+        this.connectTimeout = connectTimeout;
+        return this;
+    }
+
+    public DefaultMetadataResolver withReadTimeout(Duration readTimeout) {
+        this.readTimeout = readTimeout;
+        return this;
+    }
+
+    /**
+     * Resolves the authorization metadata.
+     * @return metadata
+     * @throws IOException if the metadata could not be resolved.
+     */
+    public Metadata resolve() throws IOException {
+        try {
+            URLConnection c = this.metadataUrl.openConnection();
+            if (connectTimeout != null) {
+                c.setConnectTimeout((int) connectTimeout.toMillis());
+            }
+            if (readTimeout != null) {
+                c.setReadTimeout((int) readTimeout.toMillis());
+            }
+            c.setRequestProperty("Accept", "application/json");
+
+            Metadata metadata;
+            try (InputStream inputStream = c.getInputStream()) {
+                metadata = this.objectReader.readValue(inputStream);
+            }
+            return metadata;
+
+        } catch (IOException e) {
+            throw new IOException("Cannot obtain authorization metadata from " + metadataUrl.toString(), e);
+        }
+    }
+
+    /**
+     * Gets a well-known metadata URL for the given OAuth issuer URL.
+     * @param issuerUrl The authorization server's issuer identifier
+     * @return a resolver
+     */
+    public static DefaultMetadataResolver fromIssuerUrl(URL issuerUrl) {
+        return new DefaultMetadataResolver(getWellKnownMetadataUrl(issuerUrl));
+    }
+
+    /**
+     * Gets a well-known metadata URL for the given OAuth issuer URL.
+     * @see <a href="https://tools.ietf.org/id/draft-ietf-oauth-discovery-08.html#ASConfig">
+     *     OAuth Discovery: Obtaining Authorization Server Metadata</a>
+     * @param issuerUrl The authorization server's issuer identifier
+     * @return a URL
+     */
+    public static URL getWellKnownMetadataUrl(URL issuerUrl) {
+        try {
+            return new URL(issuerUrl, "/.well-known/openid-configuration");
+        } catch (MalformedURLException e) {
+            throw new IllegalArgumentException(e);
+        }
+    }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/Metadata.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/Metadata.java
new file mode 100644
index 0000000..93f65be
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/Metadata.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2.protocol;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.net.URL;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * Represents OAuth 2.0 Server Metadata.
+ */
+@Data
+@NoArgsConstructor
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class Metadata {
+    @JsonProperty("issuer")
+    private URL issuer;
+
+    @JsonProperty("authorization_endpoint")
+    private URL authorizationEndpoint;
+
+    @JsonProperty("token_endpoint")
+    private URL tokenEndpoint;
+
+    @JsonProperty("userinfo_endpoint")
+    private URL userInfoEndpoint;
+
+    @JsonProperty("revocation_endpoint")
+    private URL revocationEndpoint;
+
+    @JsonProperty("jwks_uri")
+    private URL jwksUri;
+
+    @JsonProperty("device_authorization_endpoint")
+    private URL deviceAuthorizationEndpoint;
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/MetadataResolver.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/MetadataResolver.java
new file mode 100644
index 0000000..85a6a0b
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/MetadataResolver.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2.protocol;
+
+import java.io.IOException;
+
+/**
+ * Resolves OAuth 2.0 authorization server metadata as described in RFC 8414.
+ */
+public interface MetadataResolver {
+    Metadata resolve() throws IOException;
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java
new file mode 100644
index 0000000..715579d
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2.protocol;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.http.Consts;
+import org.apache.http.HttpEntity;
+import org.apache.http.NameValuePair;
+import org.apache.http.StatusLine;
+import org.apache.http.client.ClientProtocolException;
+import org.apache.http.client.HttpResponseException;
+import org.apache.http.client.entity.UrlEncodedFormEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.ContentType;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.message.BasicNameValuePair;
+import org.apache.http.util.EntityUtils;
+
+/**
+ * A client for an OAuth 2.0 token endpoint.
+ */
+public class TokenClient implements AutoCloseable, ClientCredentialsExchanger {
+
+    private static final ObjectReader resultReader;
+    private static final ObjectReader errorReader;
+
+    static {
+        resultReader = new ObjectMapper().readerFor(TokenResult.class);
+        errorReader = new ObjectMapper().readerFor(TokenError.class);
+    }
+
+    private final URL tokenUrl;
+    private final CloseableHttpClient httpclient;
+
+    public TokenClient(URL tokenUrl) {
+        this.tokenUrl = tokenUrl;
+        this.httpclient = HttpClientBuilder.create().useSystemProperties().disableCookieManagement().build();
+    }
+
+    public void close() {
+    }
+
+    /**
+     * Performs a token exchange using client credentials.
+     * @param req the client credentials request details.
+     * @return a token result
+     * @throws TokenExchangeException
+     */
+    public TokenResult exchangeClientCredentials(ClientCredentialsExchangeRequest req)
+            throws TokenExchangeException, IOException {
+        List<NameValuePair> params = new ArrayList<>(4);
+        params.add(new BasicNameValuePair("grant_type", "client_credentials"));
+        params.add(new BasicNameValuePair("client_id", req.getClientId()));
+        params.add(new BasicNameValuePair("client_secret", req.getClientSecret()));
+        params.add(new BasicNameValuePair("audience", req.getAudience()));
+        HttpPost post = new HttpPost(tokenUrl.toString());
+        post.setHeader("Accept", ContentType.APPLICATION_JSON.getMimeType());
+        post.setEntity(new UrlEncodedFormEntity(params, Consts.UTF_8));
+
+        try (CloseableHttpResponse response = httpclient.execute(post)) {
+            StatusLine status = response.getStatusLine();
+            HttpEntity entity = response.getEntity();
+            try {
+                switch(status.getStatusCode()) {
+                    case HttpURLConnection.HTTP_OK:
+                        return readResponse(entity, resultReader);
+                    case HttpURLConnection.HTTP_BAD_REQUEST:
+                    case HttpURLConnection.HTTP_UNAUTHORIZED:
+                        throw new TokenExchangeException(readResponse(entity, errorReader));
+                    default:
+                        throw new HttpResponseException(status.getStatusCode(), status.getReasonPhrase());
+                }
+            } finally {
+                EntityUtils.consume(entity);
+            }
+        }
+    }
+
+    private static <T> T readResponse(HttpEntity entity, ObjectReader objectReader) throws IOException {
+        ContentType contentType = ContentType.getOrDefault(entity);
+        if (!ContentType.APPLICATION_JSON.getMimeType().equals(contentType.getMimeType())) {
+            throw new ClientProtocolException("Unsupported content type: " + contentType.getMimeType());
+        }
+        Charset charset = contentType.getCharset();
+        if (charset == null) {
+            charset = StandardCharsets.UTF_8;
+        }
+        try (Reader reader = new InputStreamReader(entity.getContent(), charset)) {
+            @SuppressWarnings("unchecked") T obj = (T) objectReader.readValue(reader);
+            return obj;
+        }
+    }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenError.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenError.java
new file mode 100644
index 0000000..5f050a1
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenError.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2.protocol;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * Represents an error returned from an OAuth 2.0 token endpoint.
+ */
+@Data
+@NoArgsConstructor
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class TokenError {
+    @JsonProperty("error")
+    private String error;
+
+    @JsonProperty("error_description")
+    private String errorDescription;
+
+    @JsonProperty("error_uri")
+    private String errorUri;
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenExchangeException.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenExchangeException.java
new file mode 100644
index 0000000..286051f
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenExchangeException.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2.protocol;
+
+/**
+ * Indicates a token exchange failure.
+ */
+public class TokenExchangeException extends Exception {
+    private TokenError error;
+
+    public TokenExchangeException(TokenError error) {
+        super(String.format("%s (%s)", error.getErrorDescription(), error.getError()));
+        this.error = error;
+    }
+
+    public TokenError getError() {
+        return error;
+    }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenResult.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenResult.java
new file mode 100644
index 0000000..8b333c0
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenResult.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2.protocol;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.io.Serializable;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * The result of a token exchange request.
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class TokenResult implements Serializable {
+    private static final long serialVersionUID = 1L;
+
+    @JsonProperty("access_token")
+    private String accessToken;
+
+    @JsonProperty("id_token")
+    private String idToken;
+
+    @JsonProperty("refresh_token")
+    private String refreshToken;
+
+    @JsonProperty("expires_in")
+    private int expiresIn;
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/package-info.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/package-info.java
new file mode 100644
index 0000000..2068111
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2.protocol;
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java
new file mode 100644
index 0000000..f45c2c0
--- /dev/null
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2;
+
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenResult;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Tests {@link AuthenticationOAuth2}.
+ */
+public class AuthenticationOAuth2Test {
+    private static final String TEST_ACCESS_TOKEN = "x.y.z";
+    private static final int TEST_EXPIRES_IN = 60;
+
+    private MockClock clock;
+    private Flow flow;
+    private AuthenticationOAuth2 auth;
+
+    @BeforeMethod
+    public void before() {
+        this.clock = new MockClock(Instant.EPOCH, ZoneOffset.UTC);
+        this.flow = mock(Flow.class);
+        this.auth = new AuthenticationOAuth2(flow, this.clock);
+    }
+
+    @Test
+    public void testGetAuthMethodName() {
+        assertEquals(this.auth.getAuthMethodName(), "token");
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = ".*No.*")
+    public void testConfigureNoParams() throws Exception {
+        this.auth.configure("");
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = ".*Malformed.*")
+    public void testConfigureMalformed() throws Exception {
+        this.auth.configure("{garbage}");
+    }
+
+    @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = ".*Required.*")
+    public void testConfigureRequired() throws Exception {
+        this.auth.configure("{}");
+    }
+
+    @Test
+    public void testConfigure() throws Exception {
+        Map<String, String> params = new HashMap<>();
+        params.put("type", "client_credentials");
+        params.put("privateKey", "data:base64,e30=");
+        params.put("issuerUrl", "http://localhost");
+        params.put("audience", "http://localhost");
+        ObjectMapper mapper = new ObjectMapper();
+        String authParams = mapper.writeValueAsString(params);
+        this.auth.configure(authParams);
+        assertNotNull(this.auth.flow);
+    }
+
+    @Test
+    public void testStart() throws Exception {
+        this.auth.start();
+        verify(this.flow).initialize();
+    }
+
+    @Test
+    public void testGetAuthData() throws Exception {
+        AuthenticationDataProvider data;
+        TokenResult tr = TokenResult.builder().accessToken(TEST_ACCESS_TOKEN).expiresIn(TEST_EXPIRES_IN).build();
+        doReturn(tr).when(this.flow).authenticate();
+        data = this.auth.getAuthData();
+        verify(this.flow, times(1)).authenticate();
+        assertEquals(data.getCommandData(), tr.getAccessToken());
+
+        // cache hit
+        data = this.auth.getAuthData();
+        verify(this.flow, times(1)).authenticate();
+        assertEquals(data.getCommandData(), tr.getAccessToken());
+
+        // cache miss
+        clock.advance(Duration.ofSeconds(TEST_EXPIRES_IN));
+        data = this.auth.getAuthData();
+        verify(this.flow, times(2)).authenticate();
+        assertEquals(data.getCommandData(), tr.getAccessToken());
+    }
+
+    @Test
+    public void testClose() throws Exception {
+        this.auth.close();
+        verify(this.flow).close();
+    }
+}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/MockClock.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/MockClock.java
new file mode 100644
index 0000000..1e23311
--- /dev/null
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/MockClock.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2;
+
+import java.io.Serializable;
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneId;
+
+/**
+ * A clock where the current instant is manually adjustable.
+ */
+public class MockClock extends Clock implements Serializable {
+    private static final long serialVersionUID = 1L;
+    private Instant instant;
+    private final ZoneId zone;
+
+    public MockClock(Instant fixedInstant, ZoneId zone) {
+        this.instant = fixedInstant;
+        this.zone = zone;
+    }
+
+    @Override
+    public ZoneId getZone() {
+        return zone;
+    }
+
+    @Override
+    public Clock withZone(ZoneId zone) {
+        if (zone.equals(this.zone)) {
+            return this;
+        }
+        return new MockClock(instant, zone);
+    }
+
+    @Override
+    public long millis() {
+        return instant.toEpochMilli();
+    }
+
+    @Override
+    public Instant instant() {
+        return instant;
+    }
+
+    /**
+     * Sets the clock to the given instant.
+     * @param fixedInstant the instant
+     */
+    public void setInstant(Instant fixedInstant) {
+        this.instant = fixedInstant;
+    }
+
+    /**
+     * Advances the clock by the given duration.
+     * @param duration the duration
+     */
+    public void advance(Duration duration) {
+        this.instant = this.instant.plus(duration);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (obj instanceof MockClock) {
+            MockClock other = (MockClock) obj;
+            return instant.equals(other.instant) && zone.equals(other.zone);
+        }
+        return false;
+    }
+
+    @Override
+    public int hashCode() {
+        return instant.hashCode() ^ zone.hashCode();
+    }
+
+    @Override
+    public String toString() {
+        return "MockClock[" + instant + "," + zone + "]";
+    }
+}