You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/07/29 09:39:05 UTC
[pulsar] 09/14: [client authentication] add authentication client
with oauth2 support (#7420)
This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit c86fe38266eb3181a9942993cecff65fdee1a511
Author: Jia Zhai <zh...@apache.org>
AuthorDate: Thu Jul 2 01:41:31 2020 -0500
[client authentication] add authentication client with oauth2 support (#7420)
### Motivation
Pulsar supports authenticating clients using OAuth 2.0 access tokens. You can use tokens to identify a Pulsar client and associate with some "principal" (or "role") that is permitted to do some actions (eg: publish to a topic or consume from a topic).
This module is to support Pulsar Client Authentication Plugin for OAuth 2.0 directly. Client side communicate with Oauth 2.0 server, then the client will get an `access token` from Oauth 2.0 server, and will pass this `access token` to Pulsar broker to do the authentication.
So the Broker side could still use `org.apache.pulsar.broker.authentication.AuthenticationProviderToken`,
also user can add their own `AuthenticationProvider` to work with this module.
### Modifications
- add related code;
- add related test;
- add related doc.
The init of this client authentication module would be like:
```java
Authentication oauth2Authentication = AuthenticationFactoryOAuth2.clientCredentials(
new URL("https://dev-kt-aa9ne.us.auth0.com/oauth/token"),
new URL("file:///path/to/credential/file.json"), // key file path
"https://dev-kt-aa9ne.us.auth0.com/api/v2/"
);
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://broker.example.com:6650/")
.authentication(oauth2Authentication)
.build();
```
### Verifying this change
tests passed.
(cherry picked from commit 768813ea595697e317a0ac5af53191e6cf832766)
---
.../TokenAuthenticatedProducerConsumerTest.java | 143 ++++++++++++++++++
...kenOauth2AuthenticatedProducerConsumerTest.java | 160 +++++++++++++++++++++
.../authentication/token/credentials_file.json | 4 +
.../impl/auth/oauth2/AuthenticationDataOAuth2.java | 60 ++++++++
.../auth/oauth2/AuthenticationFactoryOAuth2.java | 47 ++++++
.../impl/auth/oauth2/AuthenticationOAuth2.java | 131 +++++++++++++++++
.../impl/auth/oauth2/ClientCredentialsFlow.java | 146 +++++++++++++++++++
.../pulsar/client/impl/auth/oauth2/Flow.java | 47 ++++++
.../pulsar/client/impl/auth/oauth2/FlowBase.java | 80 +++++++++++
.../pulsar/client/impl/auth/oauth2/KeyFile.java | 66 +++++++++
.../pulsar/client/impl/auth/oauth2/Readme.md | 94 ++++++++++++
.../client/impl/auth/oauth2/package-info.java | 19 +++
.../protocol/ClientCredentialsExchangeRequest.java | 42 ++++++
.../protocol/ClientCredentialsExchanger.java | 41 ++++++
.../oauth2/protocol/DefaultMetadataResolver.java | 105 ++++++++++++++
.../client/impl/auth/oauth2/protocol/Metadata.java | 54 +++++++
.../auth/oauth2/protocol/MetadataResolver.java | 28 ++++
.../impl/auth/oauth2/protocol/TokenClient.java | 121 ++++++++++++++++
.../impl/auth/oauth2/protocol/TokenError.java | 41 ++++++
.../oauth2/protocol/TokenExchangeException.java | 35 +++++
.../impl/auth/oauth2/protocol/TokenResult.java | 51 +++++++
.../impl/auth/oauth2/protocol/package-info.java | 19 +++
.../impl/auth/oauth2/AuthenticationOAuth2Test.java | 122 ++++++++++++++++
.../pulsar/client/impl/auth/oauth2/MockClock.java | 97 +++++++++++++
24 files changed, 1753 insertions(+)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java
new file mode 100644
index 0000000..874a34d
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenAuthenticatedProducerConsumerTest.java
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import static org.mockito.Mockito.spy;
+
+import com.google.common.collect.Sets;
+import java.net.URI;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+/**
+ * Test Token authentication with:
+ * client: org.apache.pulsar.client.impl.auth.AuthenticationToken
+ * broker: org.apache.pulsar.broker.authentication.AuthenticationProviderToken
+ */
+public class TokenAuthenticatedProducerConsumerTest extends ProducerConsumerBase {
+ private static final Logger log = LoggerFactory.getLogger(TokenAuthenticatedProducerConsumerTest.class);
+
+ // pre-create a public/private_key pair. Public key used for broker to verify client passed in token
+ private final String TOKEN_PUBLIC_KEY = "data:;base64,MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAhHKgdY6arG7eE75bUPtznN5WjMu0sxLq7pI5Aaiw2Ijerbz33iO/Fdd2yJVuAZNDZPD/AVSaeliEh/BP+s2rN8KNuiywD+SlL1NGf2JDS5BvGT4Q8eHfDDRd/iY5zkK58wYwlke6C8fKCx10MTH9iYAJpzaaxs+Tu1RaatK+691aYSiMkYIfgbqAKmSCpK+48al/PkmENfuhzaTBPhCnEblhNvUhS5MjzBcAcGzecpEuVSxUzDtm8rU8DEQR6kkdXS1QnGHVNis/vgk8QzctkJKbtgDIaGzNUmDvTCyPZ8WLWSWJWb1oPxRZwpfXVP69ijU0Rme4/YkuHt6IEw6ANQIDAQAB";
+ // admin token created based on private_key.
+ private final String ADMIN_TOKEN = "eyJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImV4cCI6MTYyNTEzNjQyMn0.DAfbUPZwQURgGvor4scO0NoqoyHkCulKZkhP7kksCWFvgx6B22iKuXGX42EFlFSRMWYYgIJXV7UZATCLCjJpn_ijrO6AWBmooib3f94OPoLDdkF3qXnqaLnvJtl8_sCoLCSghR_O3hQFgQW2GRjMDdfJgl2_HXCWuzedtI5cQJdbpfU0NU10nzo7RtrpCmUdgQYQEHegYOawLqQVvr53ZGjrZilBXY9HHz1mSlnwZGNGVNNdvRthBuGtXtfKgtfSDF5jLqABvK8TUpdNJybibeiOspdzuY19-wVt4eVXzNAGsP4V4Zs91MgIUYV5lWKnBUuVWalppkMWhRF4Jf-KWQ";
+
+ @BeforeMethod
+ @Override
+ protected void setup() throws Exception {
+ conf.setAuthenticationEnabled(true);
+ conf.setAuthorizationEnabled(true);
+
+ Set<String> superUserRoles = new HashSet<>();
+ superUserRoles.add("admin");
+ conf.setSuperUserRoles(superUserRoles);
+
+ Set<String> providers = new HashSet<>();
+ providers.add(AuthenticationProviderToken.class.getName());
+ conf.setAuthenticationProviders(providers);
+
+ conf.setClusterName("test");
+
+ // Set provider domain name
+ Properties properties = new Properties();
+ properties.setProperty("tokenPublicKey", TOKEN_PUBLIC_KEY);
+
+ conf.setProperties(properties);
+ super.init();
+ }
+
+ // setup both admin and pulsar client
+ protected final void clientSetup() throws Exception {
+ admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString())
+ .authentication(AuthenticationFactory.token(ADMIN_TOKEN))
+ .build());
+
+ pulsarClient = PulsarClient.builder().serviceUrl(new URI(pulsar.getBrokerServiceUrl()).toString())
+ .statsInterval(0, TimeUnit.SECONDS)
+ .authentication(AuthenticationFactory.token(ADMIN_TOKEN))
+ .build();
+ }
+
+ @AfterMethod
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @DataProvider(name = "batch")
+ public Object[][] codecProvider() {
+ return new Object[][] { { 0 }, { 1000 } };
+ }
+
+ public void testSyncProducerAndConsumer() throws Exception {
+ Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/my-topic")
+ .subscriptionName("my-subscriber-name").subscribe();
+
+ ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic");
+
+ Producer<byte[]> producer = producerBuilder.create();
+ for (int i = 0; i < 10; i++) {
+ String message = "my-message-" + i;
+ producer.send(message.getBytes());
+ }
+
+ Message<byte[]> msg = null;
+ Set<String> messageSet = Sets.newHashSet();
+ for (int i = 0; i < 10; i++) {
+ msg = consumer.receive(5, TimeUnit.SECONDS);
+ String receivedMessage = new String(msg.getData());
+ log.debug("Received message: [{}]", receivedMessage);
+ String expectedMessage = "my-message-" + i;
+ testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
+ }
+ // Acknowledge the consumption of all messages at once
+ consumer.acknowledgeCumulative(msg);
+ consumer.close();
+ }
+
+ @Test
+ public void testTokenProducerAndConsumer() throws Exception {
+ log.info("-- Starting {} test --", methodName);
+ clientSetup();
+
+ // test rest by admin
+ admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString()));
+ admin.tenants().createTenant("my-property",
+ new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test")));
+ admin.namespaces().createNamespace("my-property/my-ns", Sets.newHashSet("test"));
+
+ // test protocol by producer/consumer
+ testSyncProducerAndConsumer();
+
+ log.info("-- Exiting {} test --", methodName);
+ }
+
+}
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java
new file mode 100644
index 0000000..b54d8c9
--- /dev/null
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/api/TokenOauth2AuthenticatedProducerConsumerTest.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.api;
+
+import static org.mockito.Mockito.spy;
+
+import com.google.common.collect.Sets;
+import java.net.URI;
+import java.net.URL;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.HashSet;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import org.apache.pulsar.broker.authentication.AuthenticationProviderToken;
+import org.apache.pulsar.client.admin.PulsarAdmin;
+import org.apache.pulsar.client.impl.auth.oauth2.AuthenticationFactoryOAuth2;
+import org.apache.pulsar.common.policies.data.ClusterData;
+import org.apache.pulsar.common.policies.data.TenantInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.DataProvider;
+import org.testng.annotations.Test;
+
+/**
+ * Test Token authentication with:
+ * client: org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
+ * broker: org.apache.pulsar.broker.authentication.AuthenticationProviderToken
+ */
+public class TokenOauth2AuthenticatedProducerConsumerTest extends ProducerConsumerBase {
+ private static final Logger log = LoggerFactory.getLogger(TokenOauth2AuthenticatedProducerConsumerTest.class);
+
+ // public key in oauth2 server to verify the client passed in token. get from https://jwt.io/
+ private final String TOKEN_TEST_PUBLIC_KEY = "data:;base64,MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA2tZd/4gJda3U2Pc3tpgRAN7JPGWx/Gn17v/0IiZlNNRbP/Mmf0Vc6G1qsnaRaWNWOR+t6/a6ekFHJMikQ1N2X6yfz4UjMc8/G2FDPRmWjA+GURzARjVhxc/BBEYGoD0Kwvbq/u9CZm2QjlKrYaLfg3AeB09j0btNrDJ8rBsNzU6AuzChRvXj9IdcE/A/4N/UQ+S9cJ4UXP6NJbToLwajQ5km+CnxdGE6nfB7LWHvOFHjn9C2Rb9e37CFlmeKmIVFkagFM0gbmGOb6bnGI8Bp/VNGV0APef4YaBvBTqwoZ1Z4aDHy5eRxXfAMdtBkBupmBXqL6bpd15XRYUbu/7ck9QIDAQAB";
+
+ private final String ADMIN_ROLE = "Xd23RHsUnvUlP7wchjNYOaIfazgeHd9x@clients";
+
+ // Credentials File, which contains "client_id" and "client_secret"
+ private final String CREDENTIALS_FILE = "./src/test/resources/authentication/token/credentials_file.json";
+
+ @BeforeMethod
+ @Override
+ protected void setup() throws Exception {
+ conf.setAuthenticationEnabled(true);
+ conf.setAuthorizationEnabled(true);
+
+ Set<String> superUserRoles = new HashSet<>();
+ superUserRoles.add(ADMIN_ROLE);
+ conf.setSuperUserRoles(superUserRoles);
+
+ Set<String> providers = new HashSet<>();
+ providers.add(AuthenticationProviderToken.class.getName());
+ conf.setAuthenticationProviders(providers);
+
+ conf.setClusterName("test");
+
+ // Set provider domain name
+ Properties properties = new Properties();
+ properties.setProperty("tokenPublicKey", TOKEN_TEST_PUBLIC_KEY);
+
+ conf.setProperties(properties);
+ super.init();
+ }
+
+ // setup both admin and pulsar client
+ protected final void clientSetup() throws Exception {
+ Path path = Paths.get(CREDENTIALS_FILE).toAbsolutePath();
+ log.info("Credentials File path: {}", path.toString());
+
+ // AuthenticationOAuth2
+ Authentication authentication = AuthenticationFactoryOAuth2.clientCredentials(
+ new URL("https://dev-kt-aa9ne.us.auth0.com/oauth/token"),
+ new URL("file://" + path.toString()), // key file path
+ "https://dev-kt-aa9ne.us.auth0.com/api/v2/"
+ );
+
+ admin = spy(PulsarAdmin.builder().serviceHttpUrl(brokerUrl.toString())
+ .authentication(authentication)
+ .build());
+
+ pulsarClient = PulsarClient.builder().serviceUrl(new URI(pulsar.getBrokerServiceUrl()).toString())
+ .statsInterval(0, TimeUnit.SECONDS)
+ .authentication(authentication)
+ .build();
+ }
+
+ @AfterMethod
+ @Override
+ protected void cleanup() throws Exception {
+ super.internalCleanup();
+ }
+
+ @DataProvider(name = "batch")
+ public Object[][] codecProvider() {
+ return new Object[][] { { 0 }, { 1000 } };
+ }
+
+ public void testSyncProducerAndConsumer() throws Exception {
+ Consumer<byte[]> consumer = pulsarClient.newConsumer().topic("persistent://my-property/my-ns/my-topic")
+ .subscriptionName("my-subscriber-name").subscribe();
+
+ ProducerBuilder<byte[]> producerBuilder = pulsarClient.newProducer().topic("persistent://my-property/my-ns/my-topic");
+
+ Producer<byte[]> producer = producerBuilder.create();
+ for (int i = 0; i < 10; i++) {
+ String message = "my-message-" + i;
+ producer.send(message.getBytes());
+ }
+
+ Message<byte[]> msg = null;
+ Set<String> messageSet = Sets.newHashSet();
+ for (int i = 0; i < 10; i++) {
+ msg = consumer.receive(5, TimeUnit.SECONDS);
+ String receivedMessage = new String(msg.getData());
+ log.debug("Received message: [{}]", receivedMessage);
+ String expectedMessage = "my-message-" + i;
+ testMessageOrderAndDuplicates(messageSet, receivedMessage, expectedMessage);
+ }
+ // Acknowledge the consumption of all messages at once
+ consumer.acknowledgeCumulative(msg);
+ consumer.close();
+ }
+
+ @Test
+ public void testTokenProducerAndConsumer() throws Exception {
+ log.info("-- Starting {} test --", methodName);
+ clientSetup();
+
+ // test rest by admin
+ admin.clusters().createCluster("test", new ClusterData(brokerUrl.toString()));
+ admin.tenants().createTenant("my-property",
+ new TenantInfo(Sets.newHashSet("appid1", "appid2"), Sets.newHashSet("test")));
+ admin.namespaces().createNamespace("my-property/my-ns", Sets.newHashSet("test"));
+
+ // test protocol by producer/consumer
+ testSyncProducerAndConsumer();
+
+ log.info("-- Exiting {} test --", methodName);
+ }
+
+}
diff --git a/pulsar-broker/src/test/resources/authentication/token/credentials_file.json b/pulsar-broker/src/test/resources/authentication/token/credentials_file.json
new file mode 100644
index 0000000..db1eccd
--- /dev/null
+++ b/pulsar-broker/src/test/resources/authentication/token/credentials_file.json
@@ -0,0 +1,4 @@
+{
+ "client_id":"Xd23RHsUnvUlP7wchjNYOaIfazgeHd9x",
+ "client_secret":"rT7ps7WY8uhdVuBTKWZkttwLdQotmdEliaM5rLfmgNibvqziZ-g07ZH52N_poGAb"
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationDataOAuth2.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationDataOAuth2.java
new file mode 100644
index 0000000..59810f5
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationDataOAuth2.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2;
+
+import java.util.Collections;
+import java.util.Map;
+import java.util.Set;
+import org.apache.pulsar.client.api.AuthenticationDataProvider;
+
+/**
+ * Provide OAuth 2.0 authentication data.
+ */
+class AuthenticationDataOAuth2 implements AuthenticationDataProvider {
+ public static final String HTTP_HEADER_NAME = "Authorization";
+
+ private final String accessToken;
+ private final Set<Map.Entry<String, String>> headers;
+
+ public AuthenticationDataOAuth2(String accessToken) {
+ this.accessToken = accessToken;
+ this.headers = Collections.singletonMap(HTTP_HEADER_NAME, "Bearer " + accessToken).entrySet();
+ }
+
+ @Override
+ public boolean hasDataForHttp() {
+ return true;
+ }
+
+ @Override
+ public Set<Map.Entry<String, String>> getHttpHeaders() {
+ return this.headers;
+ }
+
+ @Override
+ public boolean hasDataFromCommand() {
+ return true;
+ }
+
+ @Override
+ public String getCommandData() {
+ return this.accessToken;
+ }
+
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java
new file mode 100644
index 0000000..54da5287d
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationFactoryOAuth2.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2;
+
+import java.net.URL;
+import java.time.Clock;
+import org.apache.pulsar.client.api.Authentication;
+
+/**
+ * Factory class that allows to create {@link Authentication} instances
+ * for OAuth 2.0 authentication methods.
+ */
+public final class AuthenticationFactoryOAuth2 {
+
+ /**
+ * Authenticate with client credentials.
+ *
+ * @param issuerUrl the issuer URL
+ * @param credentialsUrl the credentials URL
+ * @param audience the audience identifier
+ * @return an Authentication object
+ */
+ public static Authentication clientCredentials(URL issuerUrl, URL credentialsUrl, String audience) {
+ ClientCredentialsFlow flow = ClientCredentialsFlow.builder()
+ .issuerUrl(issuerUrl)
+ .privateKey(credentialsUrl.toExternalForm())
+ .audience(audience)
+ .build();
+ return new AuthenticationOAuth2(flow, Clock.systemDefaultZone());
+ }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java
new file mode 100644
index 0000000..f7f41d0
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2.java
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2;
+
+import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenResult;
+import java.io.IOException;
+import java.time.Clock;
+import java.time.Instant;
+import java.util.Map;
+import lombok.Data;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang.NotImplementedException;
+import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.client.api.Authentication;
+import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.client.api.EncodedAuthenticationParameterSupport;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.AuthenticationUtil;
+
+/**
+ * Pulsar client authentication provider based on OAuth 2.0.
+ */
+@Slf4j
+public class AuthenticationOAuth2 implements Authentication, EncodedAuthenticationParameterSupport {
+
+ public static final String CONFIG_PARAM_TYPE = "type";
+ public static final String TYPE_CLIENT_CREDENTIALS = "client_credentials";
+ public static final String AUTH_METHOD_NAME = "token";
+ public static final double EXPIRY_ADJUSTMENT = 0.9;
+ private static final long serialVersionUID = 1L;
+
+ final Clock clock;
+ Flow flow;
+ transient CachedToken cachedToken;
+
+ public AuthenticationOAuth2() {
+ this.clock = Clock.systemDefaultZone();
+ }
+
+ AuthenticationOAuth2(Flow flow, Clock clock) {
+ this.flow = flow;
+ this.clock = clock;
+ }
+
+ @Override
+ public String getAuthMethodName() {
+ return AUTH_METHOD_NAME;
+ }
+
+ @Override
+ public void configure(String encodedAuthParamString) {
+ if (StringUtils.isBlank(encodedAuthParamString)) {
+ throw new IllegalArgumentException("No authentication parameters were provided");
+ }
+ Map<String, String> params;
+ try {
+ params = AuthenticationUtil.configureFromJsonString(encodedAuthParamString);
+ } catch (IOException e) {
+ throw new IllegalArgumentException("Malformed authentication parameters", e);
+ }
+
+ String type = params.getOrDefault(CONFIG_PARAM_TYPE, TYPE_CLIENT_CREDENTIALS);
+ switch(type) {
+ case TYPE_CLIENT_CREDENTIALS:
+ this.flow = ClientCredentialsFlow.fromParameters(params);
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported authentication type: " + type);
+ }
+ }
+
+ @Override
+ @Deprecated
+ public void configure(Map<String, String> authParams) {
+ throw new NotImplementedException("Deprecated; use EncodedAuthenticationParameterSupport");
+ }
+
+ @Override
+ public void start() throws PulsarClientException {
+ flow.initialize();
+ }
+
+ @Override
+ public synchronized AuthenticationDataProvider getAuthData() throws PulsarClientException {
+ if (this.cachedToken == null || this.cachedToken.isExpired()) {
+ TokenResult tr = this.flow.authenticate();
+ this.cachedToken = new CachedToken(tr);
+ }
+ return this.cachedToken.getAuthData();
+ }
+
+ @Override
+ public void close() throws IOException {
+ flow.close();
+ }
+
+ @Data
+ class CachedToken {
+ private final TokenResult latest;
+ private final Instant expiresAt;
+ private final AuthenticationDataOAuth2 authData;
+
+ public CachedToken(TokenResult latest) {
+ this.latest = latest;
+ int adjustedExpiresIn = (int) (latest.getExpiresIn() * EXPIRY_ADJUSTMENT);
+ this.expiresAt = AuthenticationOAuth2.this.clock.instant().plusSeconds(adjustedExpiresIn);
+ this.authData = new AuthenticationDataOAuth2(latest.getAccessToken());
+ }
+
+ public boolean isExpired() {
+ return AuthenticationOAuth2.this.clock.instant().isAfter(this.expiresAt);
+ }
+ }
+}
+
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
new file mode 100644
index 0000000..13bf0f5
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/ClientCredentialsFlow.java
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2;
+
+import org.apache.pulsar.client.impl.auth.oauth2.protocol.ClientCredentialsExchangeRequest;
+import org.apache.pulsar.client.impl.auth.oauth2.protocol.ClientCredentialsExchanger;
+import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenClient;
+import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenExchangeException;
+import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenResult;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.net.URLConnection;
+import java.nio.charset.StandardCharsets;
+import java.util.Map;
+import lombok.Builder;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.http.entity.ContentType;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+/**
+ * Implementation of OAuth 2.0 Client Credentials flow.
+ *
+ * @see <a href="https://tools.ietf.org/html/rfc6749#section-4.4">OAuth 2.0 RFC 6749, section 4.4</a>
+ */
+@Slf4j
+class ClientCredentialsFlow extends FlowBase {
+ public static final String CONFIG_PARAM_ISSUER_URL = "issuerUrl";
+ public static final String CONFIG_PARAM_AUDIENCE = "audience";
+ public static final String CONFIG_PARAM_KEY_FILE = "privateKey";
+
+ private static final long serialVersionUID = 1L;
+
+ private final String audience;
+ private final String privateKey;
+
+ private transient ClientCredentialsExchanger exchanger;
+
+ @Builder
+ public ClientCredentialsFlow(URL issuerUrl, String audience, String privateKey) {
+ super(issuerUrl);
+ this.audience = audience;
+ this.privateKey = privateKey;
+ }
+
+ @Override
+ public void initialize() throws PulsarClientException {
+ super.initialize();
+ assert this.metadata != null;
+
+ URL tokenUrl = this.metadata.getTokenEndpoint();
+ this.exchanger = new TokenClient(tokenUrl);
+ }
+
+ public TokenResult authenticate() throws PulsarClientException {
+ // read the private key from storage
+ KeyFile keyFile;
+ try {
+ keyFile = loadPrivateKey(this.privateKey);
+ } catch (IOException e) {
+ throw new PulsarClientException.AuthenticationException("Unable to read private key: " + e.getMessage());
+ }
+
+ // request an access token using client credentials
+ ClientCredentialsExchangeRequest req = ClientCredentialsExchangeRequest.builder()
+ .clientId(keyFile.getClientId())
+ .clientSecret(keyFile.getClientSecret())
+ .audience(this.audience)
+ .build();
+ TokenResult tr;
+ try {
+ tr = this.exchanger.exchangeClientCredentials(req);
+ } catch (TokenExchangeException | IOException e) {
+ throw new PulsarClientException.AuthenticationException("Unable to obtain an access token: "
+ + e.getMessage());
+ }
+
+ return tr;
+ }
+
+ @Override
+ public void close() {
+ exchanger.close();
+ }
+
+ /**
+ * Constructs a {@link ClientCredentialsFlow} from configuration parameters.
+ * @param params
+ * @return
+ */
+ public static ClientCredentialsFlow fromParameters(Map<String, String> params) {
+ URL issuerUrl = parseParameterUrl(params, CONFIG_PARAM_ISSUER_URL);
+ String audience = parseParameterString(params, CONFIG_PARAM_AUDIENCE);
+ String privateKeyUrl = parseParameterString(params, CONFIG_PARAM_KEY_FILE);
+ return ClientCredentialsFlow.builder()
+ .issuerUrl(issuerUrl)
+ .audience(audience)
+ .privateKey(privateKeyUrl)
+ .build();
+ }
+
+ /**
+ * Loads the private key from the given URL.
+ * @param privateKeyURL
+ * @return
+ * @throws IOException
+ */
+ private static KeyFile loadPrivateKey(String privateKeyURL) throws IOException {
+ try {
+ URLConnection urlConnection = new org.apache.pulsar.client.api.url.URL(privateKeyURL).openConnection();
+
+ String protocol = urlConnection.getURL().getProtocol();
+ String contentType = urlConnection.getContentType();
+ if ("data".equals(protocol) && !ContentType.APPLICATION_JSON.getMimeType().equals(contentType)) {
+ throw new IllegalArgumentException(
+ "Unsupported media type or encoding format: " + urlConnection.getContentType());
+ }
+ KeyFile privateKey;
+ try (Reader r = new InputStreamReader((InputStream) urlConnection.getContent(), StandardCharsets.UTF_8)) {
+ privateKey = KeyFile.fromJson(r);
+ }
+ return privateKey;
+ } catch (URISyntaxException | InstantiationException | IllegalAccessException e) {
+ throw new IOException("Invalid privateKey format", e);
+ }
+ }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/Flow.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/Flow.java
new file mode 100644
index 0000000..b572325
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/Flow.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2;
+
+import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenResult;
+import java.io.Serializable;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+/**
+ * An OAuth 2.0 authorization flow.
+ */
+interface Flow extends Serializable {
+
+ /**
+ * Initializes the authorization flow.
+ * @throws PulsarClientException if the flow could not be initialized.
+ */
+ void initialize() throws PulsarClientException;
+
+ /**
+ * Acquires an access token from the OAuth 2.0 authorization server.
+ * @return a token result including an access token and optionally a refresh token.
+ * @throws PulsarClientException if authentication failed.
+ */
+ TokenResult authenticate() throws PulsarClientException;
+
+ /**
+ * Closes the authorization flow.
+ */
+ void close();
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java
new file mode 100644
index 0000000..0f47121
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/FlowBase.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2;
+
+import org.apache.pulsar.client.impl.auth.oauth2.protocol.DefaultMetadataResolver;
+import org.apache.pulsar.client.impl.auth.oauth2.protocol.Metadata;
+import org.apache.pulsar.client.impl.auth.oauth2.protocol.MetadataResolver;
+import java.io.IOException;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.Map;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang.StringUtils;
+import org.apache.pulsar.client.api.PulsarClientException;
+
+/**
+ * An abstract OAuth 2.0 authorization flow.
+ */
+@Slf4j
+abstract class FlowBase implements Flow {
+
+ private static final long serialVersionUID = 1L;
+
+ protected final URL issuerUrl;
+
+ protected transient Metadata metadata;
+
+ protected FlowBase(URL issuerUrl) {
+ this.issuerUrl = issuerUrl;
+ }
+
+ public void initialize() throws PulsarClientException {
+ try {
+ this.metadata = createMetadataResolver().resolve();
+ } catch (IOException e) {
+ log.error("Unable to retrieve OAuth 2.0 server metadata", e);
+ throw new PulsarClientException.AuthenticationException("Unable to retrieve OAuth 2.0 server metadata");
+ }
+ }
+
+ protected MetadataResolver createMetadataResolver() {
+ return DefaultMetadataResolver.fromIssuerUrl(issuerUrl);
+ }
+
+ static String parseParameterString(Map<String, String> params, String name) {
+ String s = params.get(name);
+ if (StringUtils.isEmpty(s)) {
+ throw new IllegalArgumentException("Required configuration parameter: " + name);
+ }
+ return s;
+ }
+
+ static URL parseParameterUrl(Map<String, String> params, String name) {
+ String s = params.get(name);
+ if (StringUtils.isEmpty(s)) {
+ throw new IllegalArgumentException("Required configuration parameter: " + name);
+ }
+ try {
+ return new URL(s);
+ } catch (MalformedURLException e) {
+ throw new IllegalArgumentException("Malformed configuration parameter: " + name);
+ }
+ }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/KeyFile.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/KeyFile.java
new file mode 100644
index 0000000..b4a6510
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/KeyFile.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.io.Reader;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+
+/**
+ * A JSON object representing a credentials file.
+ */
+@Data
+@NoArgsConstructor
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class KeyFile {
+
+ private static ObjectMapper objectMapper = new ObjectMapper();
+
+ @JsonProperty("type")
+ private String type;
+
+ @JsonProperty("client_id")
+ private String clientId;
+
+ @JsonProperty("client_secret")
+ private String clientSecret;
+
+ @JsonProperty("client_email")
+ private String clientEmail;
+
+ @JsonProperty("issuer_url")
+ private String issuerUrl;
+
+ public String toJson() throws IOException {
+ return objectMapper.writeValueAsString(this);
+ }
+
+ public static KeyFile fromJson(String value) throws IOException {
+ return objectMapper.readValue(value, KeyFile.class);
+ }
+
+ public static KeyFile fromJson(Reader value) throws IOException {
+ return objectMapper.readValue(value, KeyFile.class);
+ }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/Readme.md b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/Readme.md
new file mode 100644
index 0000000..cca7973
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/Readme.md
@@ -0,0 +1,94 @@
+# Pulsar Client Authentication Plugin for OAuth 2.0
+
+Pulsar supports authenticating clients using OAuth 2.0 access tokens.
+
+You can use tokens to identify a Pulsar client and associate with some "principal" (or "role") that is permitted
+to do some actions (eg: publish to a topic or consume from a topic).
+
+This module is to support Pulsar Client Authentication Plugin for OAuth 2.0. And after communicate with Oauth 2.0 server,
+client will get an `access token` from Oauth 2.0 server, and will pass this `access token` to Pulsar broker to do the authentication.
+So the Broker side could still use `org.apache.pulsar.broker.authentication.AuthenticationProviderToken`,
+also user can add their own `AuthenticationProvider` to work with this module.
+
+## Provider Configuration
+This library allows you to authenticate using an access token that is obtained from an OAuth 2.0 authorization service,
+which acts as a _token issuer_.
+
+### Authentication Types
+The authentication type determines how to obtain an access token via an OAuth 2.0 authorization flow.
+
+#### Client Credentials
+The following parameters are supported:
+
+| Parameter | Description | Example |
+|---|---|---|
+| `type` | Oauth 2.0 auth type. Optional. | default: `client_credentials` |
+| `issuerUrl` | URL of the provider which allows Pulsar to obtain an access token. Required. | `https://accounts.google.com` |
+| `privateKey` | URL to a JSON credentials file (in JSON format; see below). Required. | See "Supported Pattern Formats" |
+| `audience` | An OAuth 2.0 "resource server" identifier for the Pulsar cluster. Required. | `https://broker.example.com` |
+
+### Supported Pattern Formats of `privateKey`
+The `privateKey` parameter supports the following three pattern formats, and contains client Credentials:
+
+- `file:///path/to/file`
+- `file:/path/to/file`
+- `data:application/json;base64,<base64-encoded value>`
+
+The credentials file contains service account credentials for use with the Client Credentials authentication type.
+
+For example of a credentials file `credentials_file.json`:
+```json
+{
+ "type": "client_credentials",
+ "client_id": "d9ZyX97q1ef8Cr81WHVC4hFQ64vSlDK3",
+ "client_secret": "on1uJ...k6F6R",
+ "client_email": "1234567890-abcdefghijklmnopqrstuvwxyz@developer.gserviceaccount.com",
+ "issuer_url": "https://accounts.google.com"
+}
+```
+
+The default type is `client_credentials`, and for this type, fields "client_id" and "client_secret" is required.
+
+### Example for a typical original Oauth2 request mapping
+
+A typical original Oauth2 request, which used to get access token from Oauth2 server, is like this:
+
+```bash
+curl --request POST \
+ --url https://dev-kt-aa9ne.us.auth0.com/oauth/token \
+ --header 'content-type: application/json' \
+ --data '{
+ "client_id":"Xd23RHsUnvUlP7wchjNYOaIfazgeHd9x",
+ "client_secret":"rT7ps7WY8uhdVuBTKWZkttwLdQotmdEliaM5rLfmgNibvqziZ-g07ZH52N_poGAb",
+ "audience":"https://dev-kt-aa9ne.us.auth0.com/api/v2/",
+ "grant_type":"client_credentials"}'
+```
+
+In which,
+- `issuerUrl` parameter in this plugin is mapped to `--url https://dev-kt-aa9ne.us.auth0.com/oauth/token`
+- `privateKey` file parameter in this plugin should at least contains fields `client_id` and `client_secret`.
+- `audience` parameter in this plugin is mapped to `"audience":"https://dev-kt-aa9ne.us.auth0.com/api/v2/"`
+
+## Pulsar Client Config
+You can use the provider with the following Pulsar clients.
+
+### Java
+You can use the factory method:
+```java
+PulsarClient client = PulsarClient.builder()
+ .serviceUrl("pulsar://broker.example.com:6650/")
+ .authentication(
+ AuthenticationFactoryOAuth2.clientCredentials(this.issuerUrl, this.credentialsUrl, this.audience))
+ .build();
+```
+
+Similarly, you can use encoded parameters:
+```java
+Authentication auth = AuthenticationFactory
+ .create(AuthenticationOAuth2.class.getName(), "{"type":"client_credentials","privateKey":"...","issuerUrl":"...","audience":"..."}");
+
+PulsarClient client = PulsarClient.builder()
+ .serviceUrl("pulsar://broker.example.com:6650/")
+ .authentication(auth)
+ .build();
+```
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/package-info.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/package-info.java
new file mode 100644
index 0000000..3beeda0
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2;
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchangeRequest.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchangeRequest.java
new file mode 100644
index 0000000..7c14296
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchangeRequest.java
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2.protocol;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import lombok.Builder;
+import lombok.Data;
+
+/**
+ * A token request based on the exchange of client credentials.
+ *
+ * @see <a href="https://tools.ietf.org/html/rfc6749#section-4.4">OAuth 2.0 RFC 6749, section 4.4</a>
+ */
+@Data
+@Builder
+public class ClientCredentialsExchangeRequest {
+
+ @JsonProperty("client_id")
+ private String clientId;
+
+ @JsonProperty("client_secret")
+ private String clientSecret;
+
+ @JsonProperty("audience")
+ private String audience;
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchanger.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchanger.java
new file mode 100644
index 0000000..e6a956a
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/ClientCredentialsExchanger.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2.protocol;
+
+import java.io.IOException;
+
+/**
+ * An interface for exchanging client credentials for an access token.
+ */
+public interface ClientCredentialsExchanger {
+ /**
+ * Requests an exchange of client credentials for an access token.
+ * @param req the request details.
+ * @return an access token.
+ * @throws TokenExchangeException if the OAuth server returned a detailed error.
+ * @throws IOException if a general IO error occurred.
+ */
+ TokenResult exchangeClientCredentials(ClientCredentialsExchangeRequest req)
+ throws TokenExchangeException, IOException;
+
+ /**
+ * Closes the exchanger.
+ */
+ void close();
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/DefaultMetadataResolver.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/DefaultMetadataResolver.java
new file mode 100644
index 0000000..d16ce8b
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/DefaultMetadataResolver.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2.protocol;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectReader;
+import java.io.IOException;
+import java.io.InputStream;
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.net.URLConnection;
+import java.time.Duration;
+
+/**
+ * Resolves OAuth 2.0 authorization server metadata as described in RFC 8414.
+ */
+public class DefaultMetadataResolver implements MetadataResolver {
+
+ private final URL metadataUrl;
+ private final ObjectReader objectReader;
+ private Duration connectTimeout;
+ private Duration readTimeout;
+
+ public DefaultMetadataResolver(URL metadataUrl) {
+ this.metadataUrl = metadataUrl;
+ this.objectReader = new ObjectMapper().readerFor(Metadata.class);
+ }
+
+ public DefaultMetadataResolver withConnectTimeout(Duration connectTimeout) {
+ this.connectTimeout = connectTimeout;
+ return this;
+ }
+
+ public DefaultMetadataResolver withReadTimeout(Duration readTimeout) {
+ this.readTimeout = readTimeout;
+ return this;
+ }
+
+ /**
+ * Resolves the authorization metadata.
+ * @return metadata
+ * @throws IOException if the metadata could not be resolved.
+ */
+ public Metadata resolve() throws IOException {
+ try {
+ URLConnection c = this.metadataUrl.openConnection();
+ if (connectTimeout != null) {
+ c.setConnectTimeout((int) connectTimeout.toMillis());
+ }
+ if (readTimeout != null) {
+ c.setReadTimeout((int) readTimeout.toMillis());
+ }
+ c.setRequestProperty("Accept", "application/json");
+
+ Metadata metadata;
+ try (InputStream inputStream = c.getInputStream()) {
+ metadata = this.objectReader.readValue(inputStream);
+ }
+ return metadata;
+
+ } catch (IOException e) {
+ throw new IOException("Cannot obtain authorization metadata from " + metadataUrl.toString(), e);
+ }
+ }
+
+ /**
+ * Gets a well-known metadata URL for the given OAuth issuer URL.
+ * @param issuerUrl The authorization server's issuer identifier
+ * @return a resolver
+ */
+ public static DefaultMetadataResolver fromIssuerUrl(URL issuerUrl) {
+ return new DefaultMetadataResolver(getWellKnownMetadataUrl(issuerUrl));
+ }
+
+ /**
+ * Gets a well-known metadata URL for the given OAuth issuer URL.
+ * @see <a href="https://tools.ietf.org/id/draft-ietf-oauth-discovery-08.html#ASConfig">
+ * OAuth Discovery: Obtaining Authorization Server Metadata</a>
+ * @param issuerUrl The authorization server's issuer identifier
+ * @return a URL
+ */
+ public static URL getWellKnownMetadataUrl(URL issuerUrl) {
+ try {
+ return new URL(issuerUrl, "/.well-known/openid-configuration");
+ } catch (MalformedURLException e) {
+ throw new IllegalArgumentException(e);
+ }
+ }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/Metadata.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/Metadata.java
new file mode 100644
index 0000000..93f65be
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/Metadata.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2.protocol;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.net.URL;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * Represents OAuth 2.0 Server Metadata.
+ */
+@Data
+@NoArgsConstructor
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class Metadata {
+ @JsonProperty("issuer")
+ private URL issuer;
+
+ @JsonProperty("authorization_endpoint")
+ private URL authorizationEndpoint;
+
+ @JsonProperty("token_endpoint")
+ private URL tokenEndpoint;
+
+ @JsonProperty("userinfo_endpoint")
+ private URL userInfoEndpoint;
+
+ @JsonProperty("revocation_endpoint")
+ private URL revocationEndpoint;
+
+ @JsonProperty("jwks_uri")
+ private URL jwksUri;
+
+ @JsonProperty("device_authorization_endpoint")
+ private URL deviceAuthorizationEndpoint;
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/MetadataResolver.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/MetadataResolver.java
new file mode 100644
index 0000000..85a6a0b
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/MetadataResolver.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2.protocol;
+
+import java.io.IOException;
+
+/**
+ * Resolves OAuth 2.0 authorization server metadata as described in RFC 8414.
+ */
+public interface MetadataResolver {
+ Metadata resolve() throws IOException;
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java
new file mode 100644
index 0000000..715579d
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenClient.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2.protocol;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.io.Reader;
+import java.net.HttpURLConnection;
+import java.net.URL;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.http.Consts;
+import org.apache.http.HttpEntity;
+import org.apache.http.NameValuePair;
+import org.apache.http.StatusLine;
+import org.apache.http.client.ClientProtocolException;
+import org.apache.http.client.HttpResponseException;
+import org.apache.http.client.entity.UrlEncodedFormEntity;
+import org.apache.http.client.methods.CloseableHttpResponse;
+import org.apache.http.client.methods.HttpPost;
+import org.apache.http.entity.ContentType;
+import org.apache.http.impl.client.CloseableHttpClient;
+import org.apache.http.impl.client.HttpClientBuilder;
+import org.apache.http.message.BasicNameValuePair;
+import org.apache.http.util.EntityUtils;
+
+/**
+ * A client for an OAuth 2.0 token endpoint.
+ */
+public class TokenClient implements AutoCloseable, ClientCredentialsExchanger {
+
+ private static final ObjectReader resultReader;
+ private static final ObjectReader errorReader;
+
+ static {
+ resultReader = new ObjectMapper().readerFor(TokenResult.class);
+ errorReader = new ObjectMapper().readerFor(TokenError.class);
+ }
+
+ private final URL tokenUrl;
+ private final CloseableHttpClient httpclient;
+
+ public TokenClient(URL tokenUrl) {
+ this.tokenUrl = tokenUrl;
+ this.httpclient = HttpClientBuilder.create().useSystemProperties().disableCookieManagement().build();
+ }
+
+ public void close() {
+ }
+
+ /**
+ * Performs a token exchange using client credentials.
+ * @param req the client credentials request details.
+ * @return a token result
+ * @throws TokenExchangeException
+ */
+ public TokenResult exchangeClientCredentials(ClientCredentialsExchangeRequest req)
+ throws TokenExchangeException, IOException {
+ List<NameValuePair> params = new ArrayList<>(4);
+ params.add(new BasicNameValuePair("grant_type", "client_credentials"));
+ params.add(new BasicNameValuePair("client_id", req.getClientId()));
+ params.add(new BasicNameValuePair("client_secret", req.getClientSecret()));
+ params.add(new BasicNameValuePair("audience", req.getAudience()));
+ HttpPost post = new HttpPost(tokenUrl.toString());
+ post.setHeader("Accept", ContentType.APPLICATION_JSON.getMimeType());
+ post.setEntity(new UrlEncodedFormEntity(params, Consts.UTF_8));
+
+ try (CloseableHttpResponse response = httpclient.execute(post)) {
+ StatusLine status = response.getStatusLine();
+ HttpEntity entity = response.getEntity();
+ try {
+ switch(status.getStatusCode()) {
+ case HttpURLConnection.HTTP_OK:
+ return readResponse(entity, resultReader);
+ case HttpURLConnection.HTTP_BAD_REQUEST:
+ case HttpURLConnection.HTTP_UNAUTHORIZED:
+ throw new TokenExchangeException(readResponse(entity, errorReader));
+ default:
+ throw new HttpResponseException(status.getStatusCode(), status.getReasonPhrase());
+ }
+ } finally {
+ EntityUtils.consume(entity);
+ }
+ }
+ }
+
+ private static <T> T readResponse(HttpEntity entity, ObjectReader objectReader) throws IOException {
+ ContentType contentType = ContentType.getOrDefault(entity);
+ if (!ContentType.APPLICATION_JSON.getMimeType().equals(contentType.getMimeType())) {
+ throw new ClientProtocolException("Unsupported content type: " + contentType.getMimeType());
+ }
+ Charset charset = contentType.getCharset();
+ if (charset == null) {
+ charset = StandardCharsets.UTF_8;
+ }
+ try (Reader reader = new InputStreamReader(entity.getContent(), charset)) {
+ @SuppressWarnings("unchecked") T obj = (T) objectReader.readValue(reader);
+ return obj;
+ }
+ }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenError.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenError.java
new file mode 100644
index 0000000..5f050a1
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenError.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2.protocol;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * Represents an error returned from an OAuth 2.0 token endpoint.
+ */
+@Data
+@NoArgsConstructor
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class TokenError {
+ @JsonProperty("error")
+ private String error;
+
+ @JsonProperty("error_description")
+ private String errorDescription;
+
+ @JsonProperty("error_uri")
+ private String errorUri;
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenExchangeException.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenExchangeException.java
new file mode 100644
index 0000000..286051f
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenExchangeException.java
@@ -0,0 +1,35 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2.protocol;
+
+/**
+ * Indicates a token exchange failure.
+ */
+public class TokenExchangeException extends Exception {
+ private TokenError error;
+
+ public TokenExchangeException(TokenError error) {
+ super(String.format("%s (%s)", error.getErrorDescription(), error.getError()));
+ this.error = error;
+ }
+
+ public TokenError getError() {
+ return error;
+ }
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenResult.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenResult.java
new file mode 100644
index 0000000..8b333c0
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/TokenResult.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2.protocol;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import java.io.Serializable;
+import lombok.AllArgsConstructor;
+import lombok.Builder;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+/**
+ * The result of a token exchange request.
+ */
+@Data
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class TokenResult implements Serializable {
+ private static final long serialVersionUID = 1L;
+
+ @JsonProperty("access_token")
+ private String accessToken;
+
+ @JsonProperty("id_token")
+ private String idToken;
+
+ @JsonProperty("refresh_token")
+ private String refreshToken;
+
+ @JsonProperty("expires_in")
+ private int expiresIn;
+}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/package-info.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/package-info.java
new file mode 100644
index 0000000..2068111
--- /dev/null
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth/oauth2/protocol/package-info.java
@@ -0,0 +1,19 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2.protocol;
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java
new file mode 100644
index 0000000..f45c2c0
--- /dev/null
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/AuthenticationOAuth2Test.java
@@ -0,0 +1,122 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2;
+
+import static org.mockito.Mockito.doReturn;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.testng.Assert.assertEquals;
+import static org.testng.Assert.assertNotNull;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneOffset;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.pulsar.client.api.AuthenticationDataProvider;
+import org.apache.pulsar.client.impl.auth.oauth2.protocol.TokenResult;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+/**
+ * Tests {@link AuthenticationOAuth2}.
+ */
+public class AuthenticationOAuth2Test {
+ private static final String TEST_ACCESS_TOKEN = "x.y.z";
+ private static final int TEST_EXPIRES_IN = 60;
+
+ private MockClock clock;
+ private Flow flow;
+ private AuthenticationOAuth2 auth;
+
+ @BeforeMethod
+ public void before() {
+ this.clock = new MockClock(Instant.EPOCH, ZoneOffset.UTC);
+ this.flow = mock(Flow.class);
+ this.auth = new AuthenticationOAuth2(flow, this.clock);
+ }
+
+ @Test
+ public void testGetAuthMethodName() {
+ assertEquals(this.auth.getAuthMethodName(), "token");
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = ".*No.*")
+ public void testConfigureNoParams() throws Exception {
+ this.auth.configure("");
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = ".*Malformed.*")
+ public void testConfigureMalformed() throws Exception {
+ this.auth.configure("{garbage}");
+ }
+
+ @Test(expectedExceptions = IllegalArgumentException.class, expectedExceptionsMessageRegExp = ".*Required.*")
+ public void testConfigureRequired() throws Exception {
+ this.auth.configure("{}");
+ }
+
+ @Test
+ public void testConfigure() throws Exception {
+ Map<String, String> params = new HashMap<>();
+ params.put("type", "client_credentials");
+ params.put("privateKey", "data:base64,e30=");
+ params.put("issuerUrl", "http://localhost");
+ params.put("audience", "http://localhost");
+ ObjectMapper mapper = new ObjectMapper();
+ String authParams = mapper.writeValueAsString(params);
+ this.auth.configure(authParams);
+ assertNotNull(this.auth.flow);
+ }
+
+ @Test
+ public void testStart() throws Exception {
+ this.auth.start();
+ verify(this.flow).initialize();
+ }
+
+ @Test
+ public void testGetAuthData() throws Exception {
+ AuthenticationDataProvider data;
+ TokenResult tr = TokenResult.builder().accessToken(TEST_ACCESS_TOKEN).expiresIn(TEST_EXPIRES_IN).build();
+ doReturn(tr).when(this.flow).authenticate();
+ data = this.auth.getAuthData();
+ verify(this.flow, times(1)).authenticate();
+ assertEquals(data.getCommandData(), tr.getAccessToken());
+
+ // cache hit
+ data = this.auth.getAuthData();
+ verify(this.flow, times(1)).authenticate();
+ assertEquals(data.getCommandData(), tr.getAccessToken());
+
+ // cache miss
+ clock.advance(Duration.ofSeconds(TEST_EXPIRES_IN));
+ data = this.auth.getAuthData();
+ verify(this.flow, times(2)).authenticate();
+ assertEquals(data.getCommandData(), tr.getAccessToken());
+ }
+
+ @Test
+ public void testClose() throws Exception {
+ this.auth.close();
+ verify(this.flow).close();
+ }
+}
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/MockClock.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/MockClock.java
new file mode 100644
index 0000000..1e23311
--- /dev/null
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/auth/oauth2/MockClock.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl.auth.oauth2;
+
+import java.io.Serializable;
+import java.time.Clock;
+import java.time.Duration;
+import java.time.Instant;
+import java.time.ZoneId;
+
+/**
+ * A clock where the current instant is manually adjustable.
+ */
+public class MockClock extends Clock implements Serializable {
+ private static final long serialVersionUID = 1L;
+ private Instant instant;
+ private final ZoneId zone;
+
+ public MockClock(Instant fixedInstant, ZoneId zone) {
+ this.instant = fixedInstant;
+ this.zone = zone;
+ }
+
+ @Override
+ public ZoneId getZone() {
+ return zone;
+ }
+
+ @Override
+ public Clock withZone(ZoneId zone) {
+ if (zone.equals(this.zone)) {
+ return this;
+ }
+ return new MockClock(instant, zone);
+ }
+
+ @Override
+ public long millis() {
+ return instant.toEpochMilli();
+ }
+
+ @Override
+ public Instant instant() {
+ return instant;
+ }
+
+ /**
+ * Sets the clock to the given instant.
+ * @param fixedInstant the instant
+ */
+ public void setInstant(Instant fixedInstant) {
+ this.instant = fixedInstant;
+ }
+
+ /**
+ * Advances the clock by the given duration.
+ * @param duration the duration
+ */
+ public void advance(Duration duration) {
+ this.instant = this.instant.plus(duration);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof MockClock) {
+ MockClock other = (MockClock) obj;
+ return instant.equals(other.instant) && zone.equals(other.zone);
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return instant.hashCode() ^ zone.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return "MockClock[" + instant + "," + zone + "]";
+ }
+}