You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/07/15 08:36:34 UTC

[pulsar-client-go] branch master updated: Support oauth2 authentication for pulsar-client-go (#313)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new b9f8c5c  Support oauth2 authentication for pulsar-client-go (#313)
b9f8c5c is described below

commit b9f8c5cedefb1bfb82faee09ce06f48c8f7300f5
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Wed Jul 15 16:36:26 2020 +0800

    Support oauth2 authentication for pulsar-client-go (#313)
    
    * Authentication provider for OAuth 2.0
    - based on cloud-cli @ bc645b16ca7b7474b132ee1da8b56da35025a616
    
    * Add tests and Update license
    
    * Revert zstd version
    
    * Refactor to support multiple issuers.
    - decouple the issuer parameter from the audience
    - use issuer information that is in the keyfile
    
    * Address comments
    
    * Add tests
    
    * Change clock package
    
    Co-authored-by: Eron Wright <ew...@streamnative.io>
---
 go.mod                                      |   7 +-
 go.sum                                      |  64 +++++-
 oauth2/auth.go                              | 120 ++++++++++
 oauth2/auth_suite_test.go                   |  65 ++++++
 oauth2/authorization_tokenretriever.go      | 338 ++++++++++++++++++++++++++++
 oauth2/authorization_tokenretriever_test.go | 338 ++++++++++++++++++++++++++++
 oauth2/cache/cache.go                       | 142 ++++++++++++
 oauth2/client_credentials_flow.go           | 158 +++++++++++++
 oauth2/client_credentials_flow_test.go      | 183 +++++++++++++++
 oauth2/client_credentials_provider.go       |  66 ++++++
 oauth2/clock/clock.go                       |  98 ++++++++
 oauth2/clock/testing/fake_clock.go          | 275 ++++++++++++++++++++++
 oauth2/config_tokenprovider.go              |  57 +++++
 oauth2/config_tokenprovider_test.go         |  91 ++++++++
 oauth2/device_code_flow.go                  | 203 +++++++++++++++++
 oauth2/device_code_flow_test.go             | 230 +++++++++++++++++++
 oauth2/device_code_provider.go              | 133 +++++++++++
 oauth2/go.mod                               |  12 +
 oauth2/go.sum                               | 113 ++++++++++
 oauth2/oidc_endpoint_provider.go            |  58 +++++
 oauth2/oidc_endpoint_provider_test.go       |  92 ++++++++
 oauth2/store/keyring.go                     | 194 ++++++++++++++++
 oauth2/store/memory.go                      |  87 +++++++
 oauth2/store/store.go                       |  45 ++++
 pulsar/client.go                            |   5 +
 pulsar/client_impl_test.go                  |  94 ++++++++
 pulsar/internal/auth/oauth2.go              | 145 ++++++++++++
 pulsar/internal/auth/oauth2_test.go         | 117 ++++++++++
 pulsar/internal/auth/token.go               |   2 +-
 29 files changed, 3524 insertions(+), 8 deletions(-)

diff --git a/go.mod b/go.mod
index afae8cc..969b35a 100644
--- a/go.mod
+++ b/go.mod
@@ -1,9 +1,10 @@
 module github.com/apache/pulsar-client-go
 
-go 1.12
+go 1.13
 
 require (
 	github.com/DataDog/zstd v1.4.6-0.20200617134701-89f69fb7df32
+	github.com/apache/pulsar-client-go/oauth2 v0.0.0-00010101000000-000000000000
 	github.com/beefsack/go-rate v0.0.0-20180408011153-efa7637bb9b6
 	github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b
 	github.com/gogo/protobuf v1.3.1
@@ -11,7 +12,7 @@ require (
 	github.com/klauspost/compress v1.10.8
 	github.com/kr/pretty v0.2.0 // indirect
 	github.com/pierrec/lz4 v2.0.5+incompatible
-	github.com/pkg/errors v0.8.1
+	github.com/pkg/errors v0.9.1
 	github.com/prometheus/client_golang v1.7.1
 	github.com/sirupsen/logrus v1.4.2
 	github.com/spaolacci/murmur3 v1.1.0
@@ -20,3 +21,5 @@ require (
 	github.com/stretchr/testify v1.4.0
 	github.com/yahoo/athenz v1.8.55
 )
+
+replace github.com/apache/pulsar-client-go/oauth2 => ./oauth2
diff --git a/go.sum b/go.sum
index 9f51dbb..e766ca5 100644
--- a/go.sum
+++ b/go.sum
@@ -1,3 +1,6 @@
+cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
+github.com/99designs/keyring v1.1.5 h1:wLv7QyzYpFIyMSwOADq1CLTF9KbjbBfcnfmOGJ64aO4=
+github.com/99designs/keyring v1.1.5/go.mod h1:7hsVvt2qXgtadGevGJ4ujg+u8m6SpJ5TpHqTozIPqf0=
 github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
 github.com/DataDog/zstd v1.4.6-0.20200617134701-89f69fb7df32 h1:/gZKpgSMydtrih81nvUhlkXpZIUfthKShSCVbRzBt9Y=
 github.com/DataDog/zstd v1.4.6-0.20200617134701-89f69fb7df32/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
@@ -19,16 +22,27 @@ github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b/go.mod h1:ac9efd0D
 github.com/boynton/repl v0.0.0-20170116235056-348863958e3e/go.mod h1:Crc/GCZ3NXDVCio7Yr0o+SSrytpcFhLmVCIzi0s49t4=
 github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
 github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
+github.com/danieljoos/wincred v1.0.2 h1:zf4bhty2iLuwgjgpraD2E9UbvO+fe54XXGJbOwe23fU=
+github.com/danieljoos/wincred v1.0.2/go.mod h1:SnuYRW9lp1oJrZX/dXJqr0cPK5gYXqx3EJbmjhLdK9U=
 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
+github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
 github.com/dimfeld/httptreemux v5.0.1+incompatible h1:Qj3gVcDNoOthBAqftuD596rm4wg/adLLz5xh5CmpiCA=
 github.com/dimfeld/httptreemux v5.0.1+incompatible/go.mod h1:rbUlSV+CCpv/SuqUTP/8Bk2O3LyUV436/yaRGkhP6Z0=
+github.com/dvsekhvalnov/jose2go v0.0.0-20180829124132-7f401d37b68a h1:mq+R6XEM6lJX5VlLyZIrUSP8tSuJp82xTK89hvBwJbU=
+github.com/dvsekhvalnov/jose2go v0.0.0-20180829124132-7f401d37b68a/go.mod h1:7BvyPhdbLxMXIYTFPLsyJRFMsKmOZnQmzh6Gb+uquuM=
+github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
+github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
+github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
 github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
 github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
 github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
 github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
 github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
+github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 h1:ZpnhV/YsD2/4cESfV5+Hoeu/iUR3ruzNvZ+yQfO03a0=
+github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2/go.mod h1:bBOAhwG1umN6/6ZUMtDFBMQR8jRg9O75tm9K00oMsK4=
 github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
 github.com/gogo/protobuf v1.3.1 h1:DqDEcV5aeaTmdFBePNpYsp3FlcVH/2ISVVM9Qf8PSls=
 github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
@@ -49,6 +63,9 @@ github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
 github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
 github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg=
 github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
+github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c h1:6rhixN/i8ZofjG1Y75iExal34USq5p+wiN1tpie8IrU=
+github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c/go.mod h1:NMPJylDgVpX0MLRlPy15sqSwOFv/U1GZ2m21JhFfek0=
+github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
 github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
 github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
 github.com/jawher/mow.cli v1.0.4/go.mod h1:5hQj2V8g+qYmLUVWqu4Wuja1pI57M83EChYLVZ0sMKk=
@@ -56,6 +73,8 @@ github.com/jawher/mow.cli v1.1.0/go.mod h1:aNaQlc7ozF3vw6IJ2dHjp2ZFiA4ozMIYY6Pyu
 github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
 github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
 github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
+github.com/keybase/go-keychain v0.0.0-20190712205309-48d3d31d256d h1:Z+RDyXzjKE0i2sTjZ/b1uxiGtPhFy34Ou/Tk0qwN0kM=
+github.com/keybase/go-keychain v0.0.0-20190712205309-48d3d31d256d/go.mod h1:JJNrCn9otv/2QP4D7SMJBgaleKpOf66PnW6F5WGNRIc=
 github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00=
 github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
 github.com/klauspost/compress v1.10.8 h1:eLeJ3dr/Y9+XRfJT4l+8ZjmtB5RPJhucH2HeCV5+IZY=
@@ -71,16 +90,30 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
 github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
 github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
 github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
+github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
+github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
 github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
 github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
 github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
 github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
+github.com/mtibben/percent v0.2.1 h1:5gssi8Nqo8QU/r2pynCm+hBQHpkB/uNK7BJCFogWdzs=
+github.com/mtibben/percent v0.2.1/go.mod h1:KG9uO+SZkUp+VkRHsCdYQV3XSZrrSpR3O9ibNBTZrns=
 github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
+github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78=
+github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
+github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
+github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk=
+github.com/onsi/ginkgo v1.14.0 h1:2mOpI4JVVPBN+WQRa0WKH2eXR+Ey+uK4n7Zj0aYpIQA=
+github.com/onsi/ginkgo v1.14.0/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY=
+github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
+github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE=
+github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
 github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I=
 github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
 github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
-github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
 github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
+github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
+github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
 github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
 github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
 github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
@@ -112,7 +145,6 @@ github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
 github.com/stretchr/objx v0.2.0 h1:Hbg2NidpLE8veEBkEZTL3CvlkUIVzuU9jDplZO54c48=
 github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
 github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
-github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
 github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
 github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
 github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
@@ -120,28 +152,43 @@ github.com/yahoo/athenz v1.8.55 h1:xGhxN3yLq334APyn0Zvcc+aqu78Q7BBhYJevM3EtTW0=
 github.com/yahoo/athenz v1.8.55/go.mod h1:G7LLFUH7Z/r4QAB7FfudfuA7Am/eCzO1GlzBhDL6Kv0=
 golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
 golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 h1:HuIa8hRrWRSrqYzx1qI49NNxhdi2PrY7gxVSq1JjLDc=
 golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
+golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
 golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
 golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
 golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
 golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7 h1:AeiKBIuRw3UomYXSbLy0Mc2dDLfdtbT/IVn4keq83P0=
+golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
+golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d h1:TzXSXBo42m9gQenoE3b9BGiEpg5IG2JkU5FkPIawgtw=
+golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
+golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
-golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 h1:I6FyU15t786LL7oL/hn43zqTuEGr4PN7F4XJ1p4E3Y8=
 golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
-golang.org/x/sys v0.0.0-20190804053845-51ab0e2deafa h1:KIDDMLT1O0Nr7TSxp8xM5tJcdn8tgyAONntO829og1M=
+golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20190804053845-51ab0e2deafa/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1 h1:ogLJMz+qpzav7lGMh10LMvAkM/fAoGlaiiHYiFYdm80=
 golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
 golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
 golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
 golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
 golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
@@ -149,6 +196,8 @@ golang.org/x/tools v0.0.0-20190808195139-e713427fea3f/go.mod h1:b+2E5dAYhXwXZwtn
 golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
 golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
 golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+google.golang.org/appengine v1.4.0 h1:/wp5JvzpHIxhs/dumFmF7BXTf3Z+dd4uXta4kVyO508=
+google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
 google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
 google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
 google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
@@ -158,13 +207,18 @@ google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyz
 google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
 gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
 gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
 gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
 gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
 gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k=
 gopkg.in/square/go-jose.v2 v2.4.1/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI=
+gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
+gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
 gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
 gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
-gopkg.in/yaml.v2 v2.2.7 h1:VUgggvou5XRW9mHwD/yXxIYSMtY0zoKQf/v226p2nyo=
 gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
+gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
diff --git a/oauth2/auth.go b/oauth2/auth.go
new file mode 100644
index 0000000..867afae
--- /dev/null
+++ b/oauth2/auth.go
@@ -0,0 +1,120 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package oauth2
+
+import (
+	"fmt"
+	"time"
+
+	"github.com/apache/pulsar-client-go/oauth2/clock"
+	"github.com/dgrijalva/jwt-go"
+	"golang.org/x/oauth2"
+)
+
+const (
+	ClaimNameUserName = "https://pulsar.apache.org/username"
+)
+
+// Flow abstracts an OAuth 2.0 authentication and authorization flow
+type Flow interface {
+	// Authorize obtains an authorization grant based on an OAuth 2.0 authorization flow.
+	// The method returns a grant which may contain an initial access token.
+	Authorize(audience string) (*AuthorizationGrant, error)
+}
+
+// AuthorizationGrantRefresher refreshes OAuth 2.0 authorization grant
+type AuthorizationGrantRefresher interface {
+	// Refresh refreshes an authorization grant to contain a fresh access token
+	Refresh(grant *AuthorizationGrant) (*AuthorizationGrant, error)
+}
+
+type AuthorizationGrantType string
+
+const (
+	// GrantTypeClientCredentials represents a client credentials grant
+	GrantTypeClientCredentials AuthorizationGrantType = "client_credentials"
+
+	// GrantTypeDeviceCode represents a device code grant
+	GrantTypeDeviceCode AuthorizationGrantType = "device_code"
+)
+
+// AuthorizationGrant is a credential representing the resource owner's authorization
+// to access its protected resources, and is used by the client to obtain an access token
+type AuthorizationGrant struct {
+	// Type describes the type of authorization grant represented by this structure
+	Type AuthorizationGrantType `json:"type"`
+
+	// Audience is the intended audience of the access tokens
+	Audience string `json:"audience,omitempty"`
+
+	// ClientID is an OAuth2 client identifier used by some flows
+	ClientID string `json:"client_id,omitempty"`
+
+	// ClientCredentials is credentials data for the client credentials grant type
+	ClientCredentials *KeyFile `json:"client_credentials,omitempty"`
+
+	// the token endpoint
+	TokenEndpoint string `json:"token_endpoint"`
+
+	// Token contains an access token in the client credentials grant type,
+	// and a refresh token in the device authorization grant type
+	Token *oauth2.Token `json:"token,omitempty"`
+}
+
+// TokenResult holds token information
+type TokenResult struct {
+	AccessToken  string `json:"access_token"`
+	IDToken      string `json:"id_token"`
+	RefreshToken string `json:"refresh_token"`
+	ExpiresIn    int    `json:"expires_in"`
+}
+
+// Issuer holds information about the issuer of tokens
+type Issuer struct {
+	IssuerEndpoint string
+	ClientID       string
+	Audience       string
+}
+
+func convertToOAuth2Token(token *TokenResult, clock clock.Clock) oauth2.Token {
+	return oauth2.Token{
+		AccessToken:  token.AccessToken,
+		TokenType:    "bearer",
+		RefreshToken: token.RefreshToken,
+		Expiry:       clock.Now().Add(time.Duration(token.ExpiresIn) * time.Second),
+	}
+}
+
+// ExtractUserName extracts the username claim from an authorization grant
+func ExtractUserName(token oauth2.Token) (string, error) {
+	p := jwt.Parser{}
+	claims := jwt.MapClaims{}
+	if _, _, err := p.ParseUnverified(token.AccessToken, claims); err != nil {
+		return "", fmt.Errorf("unable to decode the access token: %v", err)
+	}
+	username, ok := claims[ClaimNameUserName]
+	if !ok {
+		return "", fmt.Errorf("access token doesn't contain a username claim")
+	}
+	switch v := username.(type) {
+	case string:
+		return v, nil
+	default:
+		return "", fmt.Errorf("access token contains an unsupported username claim")
+	}
+}
diff --git a/oauth2/auth_suite_test.go b/oauth2/auth_suite_test.go
new file mode 100644
index 0000000..95accff
--- /dev/null
+++ b/oauth2/auth_suite_test.go
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package oauth2
+
+import (
+	"context"
+	"testing"
+
+	. "github.com/onsi/ginkgo"
+	. "github.com/onsi/gomega"
+)
+
+func TestAuth(t *testing.T) {
+	RegisterFailHandler(Fail)
+	RunSpecs(t, "cloud-cli Auth Suite")
+}
+
+type MockTokenExchanger struct {
+	CalledWithRequest        interface{}
+	ReturnsTokens            *TokenResult
+	ReturnsError             error
+	RefreshCalledWithRequest *RefreshTokenExchangeRequest
+}
+
+func (te *MockTokenExchanger) ExchangeCode(req AuthorizationCodeExchangeRequest) (*TokenResult, error) {
+	te.CalledWithRequest = &req
+	return te.ReturnsTokens, te.ReturnsError
+}
+
+func (te *MockTokenExchanger) ExchangeRefreshToken(req RefreshTokenExchangeRequest) (*TokenResult, error) {
+	te.RefreshCalledWithRequest = &req
+	return te.ReturnsTokens, te.ReturnsError
+}
+
+func (te *MockTokenExchanger) ExchangeClientCredentials(req ClientCredentialsExchangeRequest) (*TokenResult, error) {
+	te.CalledWithRequest = &req
+	return te.ReturnsTokens, te.ReturnsError
+}
+
+func (te *MockTokenExchanger) ExchangeDeviceCode(ctx context.Context,
+	req DeviceCodeExchangeRequest) (*TokenResult, error) {
+	te.CalledWithRequest = &req
+	return te.ReturnsTokens, te.ReturnsError
+}
+
+var oidcEndpoints = OIDCWellKnownEndpoints{
+	AuthorizationEndpoint:       "http://issuer/auth/authorize",
+	TokenEndpoint:               "http://issuer/auth/token",
+	DeviceAuthorizationEndpoint: "http://issuer/auth/authorize/device",
+}
diff --git a/oauth2/authorization_tokenretriever.go b/oauth2/authorization_tokenretriever.go
new file mode 100644
index 0000000..93c1bfe
--- /dev/null
+++ b/oauth2/authorization_tokenretriever.go
@@ -0,0 +1,338 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package oauth2
+
+import (
+	"context"
+	"encoding/json"
+	"errors"
+	"fmt"
+	"net/http"
+	"net/url"
+	"strconv"
+	"strings"
+	"time"
+)
+
+// TokenRetriever implements AuthTokenExchanger in order to facilitate getting
+// Tokens
+type TokenRetriever struct {
+	transport HTTPAuthTransport
+}
+
+// AuthorizationTokenResponse is the HTTP response when asking for a new token.
+// Note that not all fields will contain data based on what kind of request was
+// sent
+type AuthorizationTokenResponse struct {
+	AccessToken  string `json:"access_token"`
+	ExpiresIn    int    `json:"expires_in"`
+	IDToken      string `json:"id_token"`
+	RefreshToken string `json:"refresh_token"`
+	TokenType    string `json:"token_type"`
+}
+
+// AuthorizationCodeExchangeRequest is used to request the exchange of an
+// authorization code for a token
+type AuthorizationCodeExchangeRequest struct {
+	TokenEndpoint string
+	ClientID      string
+	CodeVerifier  string
+	Code          string
+	RedirectURI   string
+}
+
+// RefreshTokenExchangeRequest is used to request the exchange of a refresh
+// token for a refreshed token
+type RefreshTokenExchangeRequest struct {
+	TokenEndpoint string
+	ClientID      string
+	RefreshToken  string
+}
+
+// ClientCredentialsExchangeRequest is used to request the exchange of
+// client credentials for a token
+type ClientCredentialsExchangeRequest struct {
+	TokenEndpoint string
+	ClientID      string
+	ClientSecret  string
+	Audience      string
+}
+
+// DeviceCodeExchangeRequest is used to request the exchange of
+// a device code for a token
+type DeviceCodeExchangeRequest struct {
+	TokenEndpoint string
+	ClientID      string
+	DeviceCode    string
+	PollInterval  time.Duration
+}
+
+// TokenErrorResponse is used to parse error responses from the token endpoint
+type TokenErrorResponse struct {
+	Error            string `json:"error"`
+	ErrorDescription string `json:"error_description"`
+}
+
+type TokenError struct {
+	ErrorCode        string
+	ErrorDescription string
+}
+
+func (e *TokenError) Error() string {
+	if e.ErrorDescription != "" {
+		return fmt.Sprintf("%s (%s)", e.ErrorDescription, e.ErrorCode)
+	}
+	return e.ErrorCode
+}
+
+// HTTPAuthTransport abstracts how an HTTP exchange request is sent and received
+type HTTPAuthTransport interface {
+	Do(request *http.Request) (*http.Response, error)
+}
+
+// NewTokenRetriever allows a TokenRetriever the internal of a new
+// TokenRetriever to be easily set up
+func NewTokenRetriever(authTransport HTTPAuthTransport) *TokenRetriever {
+	return &TokenRetriever{
+		transport: authTransport,
+	}
+}
+
+// newExchangeCodeRequest builds a new AuthTokenRequest wrapped in an
+// http.Request
+func (ce *TokenRetriever) newExchangeCodeRequest(
+	req AuthorizationCodeExchangeRequest) (*http.Request, error) {
+	uv := url.Values{}
+	uv.Set("grant_type", "authorization_code")
+	uv.Set("client_id", req.ClientID)
+	uv.Set("code_verifier", req.CodeVerifier)
+	uv.Set("code", req.Code)
+	uv.Set("redirect_uri", req.RedirectURI)
+
+	euv := uv.Encode()
+
+	request, err := http.NewRequest("POST",
+		req.TokenEndpoint,
+		strings.NewReader(euv),
+	)
+	if err != nil {
+		return nil, err
+	}
+
+	request.Header.Add("Content-Type", "application/x-www-form-urlencoded")
+	request.Header.Add("Content-Length", strconv.Itoa(len(euv)))
+
+	return request, nil
+}
+
+// newDeviceCodeExchangeRequest builds a new DeviceCodeExchangeRequest wrapped in an
+// http.Request
+func (ce *TokenRetriever) newDeviceCodeExchangeRequest(
+	req DeviceCodeExchangeRequest) (*http.Request, error) {
+	uv := url.Values{}
+	uv.Set("grant_type", "urn:ietf:params:oauth:grant-type:device_code")
+	uv.Set("client_id", req.ClientID)
+	uv.Set("device_code", req.DeviceCode)
+	euv := uv.Encode()
+
+	request, err := http.NewRequest("POST",
+		req.TokenEndpoint,
+		strings.NewReader(euv),
+	)
+	if err != nil {
+		return nil, err
+	}
+
+	request.Header.Add("Content-Type", "application/x-www-form-urlencoded")
+	request.Header.Add("Content-Length", strconv.Itoa(len(euv)))
+
+	return request, nil
+}
+
+// newRefreshTokenRequest builds a new RefreshTokenRequest wrapped in an
+// http.Request
+func (ce *TokenRetriever) newRefreshTokenRequest(req RefreshTokenExchangeRequest) (*http.Request, error) {
+	uv := url.Values{}
+	uv.Set("grant_type", "refresh_token")
+	uv.Set("client_id", req.ClientID)
+	uv.Set("refresh_token", req.RefreshToken)
+
+	euv := uv.Encode()
+
+	request, err := http.NewRequest("POST",
+		req.TokenEndpoint,
+		strings.NewReader(euv),
+	)
+	if err != nil {
+		return nil, err
+	}
+
+	request.Header.Add("Content-Type", "application/x-www-form-urlencoded")
+	request.Header.Add("Content-Length", strconv.Itoa(len(euv)))
+
+	return request, nil
+}
+
+// newClientCredentialsRequest builds a new ClientCredentialsExchangeRequest wrapped in an
+// http.Request
+func (ce *TokenRetriever) newClientCredentialsRequest(req ClientCredentialsExchangeRequest) (*http.Request, error) {
+	uv := url.Values{}
+	uv.Set("grant_type", "client_credentials")
+	uv.Set("client_id", req.ClientID)
+	uv.Set("client_secret", req.ClientSecret)
+	uv.Set("audience", req.Audience)
+
+	euv := uv.Encode()
+
+	request, err := http.NewRequest("POST",
+		req.TokenEndpoint,
+		strings.NewReader(euv),
+	)
+	if err != nil {
+		return nil, err
+	}
+
+	request.Header.Add("Content-Type", "application/x-www-form-urlencoded")
+	request.Header.Add("Content-Length", strconv.Itoa(len(euv)))
+
+	return request, nil
+}
+
+// ExchangeCode uses the AuthCodeExchangeRequest to exchange an authorization
+// code for tokens
+func (ce *TokenRetriever) ExchangeCode(req AuthorizationCodeExchangeRequest) (*TokenResult, error) {
+	request, err := ce.newExchangeCodeRequest(req)
+	if err != nil {
+		return nil, err
+	}
+
+	response, err := ce.transport.Do(request)
+	if err != nil {
+		return nil, err
+	}
+
+	return ce.handleAuthTokensResponse(response)
+}
+
+// handleAuthTokensResponse takes care of checking an http.Response that has
+// auth tokens for errors and parsing the raw body to a TokenResult struct
+func (ce *TokenRetriever) handleAuthTokensResponse(resp *http.Response) (*TokenResult, error) {
+	if resp.Body != nil {
+		defer resp.Body.Close()
+	}
+
+	if resp.StatusCode < 200 || resp.StatusCode > 299 {
+		if resp.Header.Get("Content-Type") == "application/json" {
+			er := TokenErrorResponse{}
+			err := json.NewDecoder(resp.Body).Decode(&er)
+			if err != nil {
+				return nil, err
+			}
+			return nil, &TokenError{ErrorCode: er.Error, ErrorDescription: er.ErrorDescription}
+		}
+		return nil, fmt.Errorf("a non-success status code was received: %d", resp.StatusCode)
+	}
+
+	atr := AuthorizationTokenResponse{}
+	err := json.NewDecoder(resp.Body).Decode(&atr)
+	if err != nil {
+		return nil, err
+	}
+
+	return &TokenResult{
+		AccessToken:  atr.AccessToken,
+		IDToken:      atr.IDToken,
+		RefreshToken: atr.RefreshToken,
+		ExpiresIn:    atr.ExpiresIn,
+	}, nil
+}
+
+// ExchangeDeviceCode uses the DeviceCodeExchangeRequest to exchange a device
+// code for tokens
+func (ce *TokenRetriever) ExchangeDeviceCode(ctx context.Context, req DeviceCodeExchangeRequest) (*TokenResult, error) {
+	for {
+		request, err := ce.newDeviceCodeExchangeRequest(req)
+		if err != nil {
+			return nil, err
+		}
+
+		response, err := ce.transport.Do(request)
+		if err != nil {
+			return nil, err
+		}
+		token, err := ce.handleAuthTokensResponse(response)
+		if err == nil {
+			return token, nil
+		}
+		terr, ok := err.(*TokenError)
+		if !ok {
+			return nil, err
+		}
+		switch terr.ErrorCode {
+		case "expired_token":
+			// The user has not authorized the device quickly enough, so the device_code has expired.
+			return nil, fmt.Errorf("the device code has expired")
+		case "access_denied":
+			// The user refused to authorize the device
+			return nil, fmt.Errorf("the device was not authorized")
+		case "authorization_pending":
+			// Still waiting for the user to take action
+		case "slow_down":
+			// You are polling too fast
+		}
+
+		select {
+		case <-time.After(req.PollInterval):
+			continue
+		case <-ctx.Done():
+			return nil, errors.New("cancelled")
+		}
+	}
+}
+
+// ExchangeRefreshToken uses the RefreshTokenExchangeRequest to exchange a
+// refresh token for refreshed tokens
+func (ce *TokenRetriever) ExchangeRefreshToken(req RefreshTokenExchangeRequest) (*TokenResult, error) {
+	request, err := ce.newRefreshTokenRequest(req)
+	if err != nil {
+		return nil, err
+	}
+
+	response, err := ce.transport.Do(request)
+	if err != nil {
+		return nil, err
+	}
+
+	return ce.handleAuthTokensResponse(response)
+}
+
+// ExchangeClientCredentials uses the ClientCredentialsExchangeRequest to exchange
+// client credentials for tokens
+func (ce *TokenRetriever) ExchangeClientCredentials(req ClientCredentialsExchangeRequest) (*TokenResult, error) {
+	request, err := ce.newClientCredentialsRequest(req)
+	if err != nil {
+		return nil, err
+	}
+
+	response, err := ce.transport.Do(request)
+	if err != nil {
+		return nil, err
+	}
+
+	return ce.handleAuthTokensResponse(response)
+}
diff --git a/oauth2/authorization_tokenretriever_test.go b/oauth2/authorization_tokenretriever_test.go
new file mode 100644
index 0000000..b08f878
--- /dev/null
+++ b/oauth2/authorization_tokenretriever_test.go
@@ -0,0 +1,338 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package oauth2
+
+import (
+	"bytes"
+	"context"
+	"encoding/json"
+	"io/ioutil"
+	"net/http"
+	"strings"
+	"time"
+
+	. "github.com/onsi/ginkgo"
+	. "github.com/onsi/gomega"
+)
+
+type MockTransport struct {
+	Responses   []*http.Response
+	ReturnError error
+}
+
+var _ HTTPAuthTransport = &MockTransport{}
+
+func (t *MockTransport) Do(req *http.Request) (*http.Response, error) {
+	if len(t.Responses) > 0 {
+		r := t.Responses[0]
+		t.Responses = t.Responses[1:]
+		return r, nil
+	}
+	return nil, t.ReturnError
+}
+
+var _ = Describe("CodetokenExchanger", func() {
+	Describe("newExchangeCodeRequest", func() {
+		It("creates the request", func() {
+			tokenRetriever := TokenRetriever{}
+			exchangeRequest := AuthorizationCodeExchangeRequest{
+				TokenEndpoint: "https://issuer/oauth/token",
+				ClientID:      "clientID",
+				CodeVerifier:  "Verifier",
+				Code:          "code",
+				RedirectURI:   "https://redirect",
+			}
+
+			result, err := tokenRetriever.newExchangeCodeRequest(exchangeRequest)
+
+			result.ParseForm()
+
+			Expect(err).To(BeNil())
+			Expect(result.FormValue("grant_type")).To(Equal("authorization_code"))
+			Expect(result.FormValue("client_id")).To(Equal("clientID"))
+			Expect(result.FormValue("code_verifier")).To(Equal("Verifier"))
+			Expect(result.FormValue("code")).To(Equal("code"))
+			Expect(result.FormValue("redirect_uri")).To(Equal("https://redirect"))
+			Expect(result.URL.String()).To(Equal("https://issuer/oauth/token"))
+
+			Expect(result.Header.Get("Content-Type")).To(Equal("application/x-www-form-urlencoded"))
+			Expect(result.Header.Get("Content-Length")).To(Equal("117"))
+		})
+
+		It("returns an error when NewRequest returns an error", func() {
+			tokenRetriever := TokenRetriever{}
+
+			result, err := tokenRetriever.newExchangeCodeRequest(AuthorizationCodeExchangeRequest{
+				TokenEndpoint: "://issuer/oauth/token",
+			})
+
+			Expect(result).To(BeNil())
+			Expect(err.Error()).To(Equal("parse ://issuer/oauth/token: missing protocol scheme"))
+		})
+	})
+
+	Describe("handleAuthTokensResponse", func() {
+		It("handles the response", func() {
+			tokenRetriever := TokenRetriever{}
+			response := buildResponse(200, AuthorizationTokenResponse{
+				ExpiresIn:    1,
+				AccessToken:  "myAccessToken",
+				RefreshToken: "myRefreshToken",
+			})
+
+			result, err := tokenRetriever.handleAuthTokensResponse(response)
+
+			Expect(err).To(BeNil())
+			Expect(result).To(Equal(&TokenResult{
+				ExpiresIn:    1,
+				AccessToken:  "myAccessToken",
+				RefreshToken: "myRefreshToken",
+			}))
+		})
+
+		It("returns error when status code is not successful", func() {
+			tokenRetriever := TokenRetriever{}
+			response := buildResponse(500, nil)
+
+			result, err := tokenRetriever.handleAuthTokensResponse(response)
+
+			Expect(result).To(BeNil())
+			Expect(err.Error()).To(Equal("a non-success status code was received: 500"))
+		})
+
+		It("returns typed error when response body contains error information", func() {
+			errorBody := TokenErrorResponse{Error: "test", ErrorDescription: "test description"}
+			tokenRetriever := TokenRetriever{}
+			response := buildResponse(400, errorBody)
+
+			result, err := tokenRetriever.handleAuthTokensResponse(response)
+
+			Expect(result).To(BeNil())
+			Expect(err).To(Equal(&TokenError{ErrorCode: "test", ErrorDescription: "test description"}))
+			Expect(err.Error()).To(Equal("test description (test)"))
+		})
+
+		It("returns error when deserialization fails", func() {
+			tokenRetriever := TokenRetriever{}
+			response := buildResponse(200, "")
+
+			result, err := tokenRetriever.handleAuthTokensResponse(response)
+			Expect(result).To(BeNil())
+			Expect(err.Error()).To(Equal(
+				"json: cannot unmarshal string into Go value of type oauth2.AuthorizationTokenResponse"))
+		})
+	})
+
+	Describe("newRefreshTokenRequest", func() {
+		It("creates the request", func() {
+			tokenRetriever := TokenRetriever{}
+			exchangeRequest := RefreshTokenExchangeRequest{
+				TokenEndpoint: "https://issuer/oauth/token",
+				ClientID:      "clientID",
+				RefreshToken:  "refreshToken",
+			}
+
+			result, err := tokenRetriever.newRefreshTokenRequest(exchangeRequest)
+
+			result.ParseForm()
+
+			Expect(err).To(BeNil())
+			Expect(result.FormValue("grant_type")).To(Equal("refresh_token"))
+			Expect(result.FormValue("client_id")).To(Equal("clientID"))
+			Expect(result.FormValue("refresh_token")).To(Equal("refreshToken"))
+			Expect(result.URL.String()).To(Equal("https://issuer/oauth/token"))
+
+			Expect(result.Header.Get("Content-Type")).To(Equal("application/x-www-form-urlencoded"))
+			Expect(result.Header.Get("Content-Length")).To(Equal("70"))
+		})
+
+		It("returns an error when NewRequest returns an error", func() {
+			tokenRetriever := TokenRetriever{}
+
+			result, err := tokenRetriever.newRefreshTokenRequest(RefreshTokenExchangeRequest{
+				TokenEndpoint: "://issuer/oauth/token",
+			})
+
+			Expect(result).To(BeNil())
+			Expect(err.Error()).To(Equal("parse ://issuer/oauth/token: missing protocol scheme"))
+		})
+	})
+
+	Describe("newClientCredentialsRequest", func() {
+		It("creates the request", func() {
+			tokenRetriever := TokenRetriever{}
+			exchangeRequest := ClientCredentialsExchangeRequest{
+				TokenEndpoint: "https://issuer/oauth/token",
+				ClientID:      "clientID",
+				ClientSecret:  "clientSecret",
+				Audience:      "audience",
+			}
+
+			result, err := tokenRetriever.newClientCredentialsRequest(exchangeRequest)
+
+			result.ParseForm()
+
+			Expect(err).To(BeNil())
+			Expect(result.FormValue("grant_type")).To(Equal("client_credentials"))
+			Expect(result.FormValue("client_id")).To(Equal("clientID"))
+			Expect(result.FormValue("client_secret")).To(Equal("clientSecret"))
+			Expect(result.FormValue("audience")).To(Equal("audience"))
+			Expect(result.URL.String()).To(Equal("https://issuer/oauth/token"))
+
+			Expect(result.Header.Get("Content-Type")).To(Equal("application/x-www-form-urlencoded"))
+			Expect(result.Header.Get("Content-Length")).To(Equal("93"))
+		})
+
+		It("returns an error when NewRequest returns an error", func() {
+			tokenRetriever := TokenRetriever{}
+
+			result, err := tokenRetriever.newClientCredentialsRequest(ClientCredentialsExchangeRequest{
+				TokenEndpoint: "://issuer/oauth/token",
+			})
+
+			Expect(result).To(BeNil())
+			Expect(err.Error()).To(Equal("parse ://issuer/oauth/token: missing protocol scheme"))
+		})
+	})
+
+	Describe("newDeviceCodeExchangeRequest", func() {
+		It("creates the request", func() {
+			tokenRetriever := TokenRetriever{}
+			exchangeRequest := DeviceCodeExchangeRequest{
+				TokenEndpoint: "https://issuer/oauth/token",
+				ClientID:      "clientID",
+				DeviceCode:    "deviceCode",
+				PollInterval:  time.Duration(5) * time.Second,
+			}
+
+			result, err := tokenRetriever.newDeviceCodeExchangeRequest(exchangeRequest)
+
+			result.ParseForm()
+
+			Expect(err).To(BeNil())
+			Expect(result.FormValue("grant_type")).To(Equal("urn:ietf:params:oauth:grant-type:device_code"))
+			Expect(result.FormValue("client_id")).To(Equal("clientID"))
+			Expect(result.FormValue("device_code")).To(Equal("deviceCode"))
+			Expect(result.URL.String()).To(Equal("https://issuer/oauth/token"))
+
+			Expect(result.Header.Get("Content-Type")).To(Equal("application/x-www-form-urlencoded"))
+			Expect(result.Header.Get("Content-Length")).To(Equal("107"))
+		})
+
+		It("returns an error when NewRequest returns an error", func() {
+			tokenRetriever := TokenRetriever{}
+
+			result, err := tokenRetriever.newClientCredentialsRequest(ClientCredentialsExchangeRequest{
+				TokenEndpoint: "://issuer/oauth/token",
+			})
+
+			Expect(result).To(BeNil())
+			Expect(err.Error()).To(Equal("parse ://issuer/oauth/token: missing protocol scheme"))
+		})
+	})
+
+	Describe("ExchangeDeviceCode", func() {
+		var mockTransport *MockTransport
+		var tokenRetriever *TokenRetriever
+		var exchangeRequest DeviceCodeExchangeRequest
+		var tokenResult TokenResult
+
+		BeforeEach(func() {
+			mockTransport = &MockTransport{}
+			tokenRetriever = &TokenRetriever{
+				transport: mockTransport,
+			}
+			exchangeRequest = DeviceCodeExchangeRequest{
+				TokenEndpoint: "https://issuer/oauth/token",
+				ClientID:      "clientID",
+				DeviceCode:    "deviceCode",
+				PollInterval:  time.Duration(1) * time.Second,
+			}
+			tokenResult = TokenResult{
+				ExpiresIn:    1,
+				AccessToken:  "myAccessToken",
+				RefreshToken: "myRefreshToken",
+			}
+		})
+
+		It("returns a token", func() {
+		})
+
+		It("supports cancellation", func() {
+			mockTransport.Responses = []*http.Response{
+				buildResponse(400, &TokenErrorResponse{"authorization_pending", ""}),
+			}
+			ctx, cancel := context.WithCancel(context.Background())
+			cancel()
+			_, err := tokenRetriever.ExchangeDeviceCode(ctx, exchangeRequest)
+			Expect(err).ToNot(BeNil())
+			Expect(err.Error()).To(Equal("cancelled"))
+		})
+
+		It("implements authorization_pending and slow_down", func() {
+			startTime := time.Now()
+			mockTransport.Responses = []*http.Response{
+				buildResponse(400, &TokenErrorResponse{"authorization_pending", ""}),
+				buildResponse(400, &TokenErrorResponse{"authorization_pending", ""}),
+				buildResponse(400, &TokenErrorResponse{"slow_down", ""}),
+				buildResponse(200, &tokenResult),
+			}
+			token, err := tokenRetriever.ExchangeDeviceCode(context.Background(), exchangeRequest)
+			Expect(err).To(BeNil())
+			Expect(token).To(Equal(&tokenResult))
+			endTime := time.Now()
+			Expect(endTime.Sub(startTime)).To(BeNumerically(">", exchangeRequest.PollInterval*3))
+		})
+
+		It("implements expired_token", func() {
+			mockTransport.Responses = []*http.Response{
+				buildResponse(400, &TokenErrorResponse{"expired_token", ""}),
+			}
+			_, err := tokenRetriever.ExchangeDeviceCode(context.Background(), exchangeRequest)
+			Expect(err).ToNot(BeNil())
+			Expect(err.Error()).To(Equal("the device code has expired"))
+		})
+
+		It("implements access_denied", func() {
+			mockTransport.Responses = []*http.Response{
+				buildResponse(400, &TokenErrorResponse{"access_denied", ""}),
+			}
+			_, err := tokenRetriever.ExchangeDeviceCode(context.Background(), exchangeRequest)
+			Expect(err).ToNot(BeNil())
+			Expect(err.Error()).To(Equal("the device was not authorized"))
+		})
+	})
+})
+
+func buildResponse(statusCode int, body interface{}) *http.Response {
+	b, err := json.Marshal(body)
+	if err != nil {
+		panic(err)
+	}
+
+	resp := &http.Response{
+		StatusCode: statusCode,
+		Header:     map[string][]string{},
+		Body:       ioutil.NopCloser(bytes.NewReader(b)),
+	}
+	if strings.HasPrefix(string(b), "{") {
+		resp.Header.Add("Content-Type", "application/json")
+	}
+
+	return resp
+}
diff --git a/oauth2/cache/cache.go b/oauth2/cache/cache.go
new file mode 100644
index 0000000..3d90bfa
--- /dev/null
+++ b/oauth2/cache/cache.go
@@ -0,0 +1,142 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package cache
+
+import (
+	"fmt"
+	"sync"
+	"time"
+
+	"github.com/apache/pulsar-client-go/oauth2"
+	"github.com/apache/pulsar-client-go/oauth2/store"
+
+	xoauth2 "golang.org/x/oauth2"
+	"github.com/apache/pulsar-client-go/oauth2/clock"
+)
+
+// A CachingTokenSource is anything that can return a token, and is backed by a cache.
+type CachingTokenSource interface {
+	xoauth2.TokenSource
+
+	// InvalidateToken is called when the token is rejected by the resource server.
+	InvalidateToken() error
+}
+
+const (
+	// expiryDelta adjusts the token TTL to avoid using tokens which are almost expired
+	expiryDelta = time.Duration(60) * time.Second
+)
+
+// tokenCache implements a cache for the token associated with a specific audience.
+// it interacts with the store when the access token is near expiration or invalidated.
+// it is advisable to use a token cache instance per audience.
+type tokenCache struct {
+	clock     clock.Clock
+	lock      sync.Mutex
+	store     store.Store
+	audience  string
+	refresher oauth2.AuthorizationGrantRefresher
+	token     *xoauth2.Token
+}
+
+func NewDefaultTokenCache(store store.Store, audience string,
+	refresher oauth2.AuthorizationGrantRefresher) (CachingTokenSource, error) {
+	cache := &tokenCache{
+		clock:     clock.RealClock{},
+		store:     store,
+		audience:  audience,
+		refresher: refresher,
+	}
+	return cache, nil
+}
+
+var _ CachingTokenSource = &tokenCache{}
+
+// Token returns a valid access token, if available.
+func (t *tokenCache) Token() (*xoauth2.Token, error) {
+	t.lock.Lock()
+	defer t.lock.Unlock()
+
+	// use the cached access token if it isn't expired
+	if t.token != nil && t.validateAccessToken(*t.token) {
+		return t.token, nil
+	}
+
+	// load from the store and use the access token if it isn't expired
+	grant, err := t.store.LoadGrant(t.audience)
+	if err != nil {
+		return nil, fmt.Errorf("LoadGrant: %v", err)
+	}
+	t.token = grant.Token
+	if t.token != nil && t.validateAccessToken(*t.token) {
+		return t.token, nil
+	}
+
+	// obtain and cache a fresh access token
+	grant, err = t.refresher.Refresh(grant)
+	if err != nil {
+		return nil, fmt.Errorf("RefreshGrant: %v", err)
+	}
+	t.token = grant.Token
+	err = t.store.SaveGrant(t.audience, *grant)
+	if err != nil {
+		// TODO log rather than throw
+		return nil, fmt.Errorf("SaveGrant: %v", err)
+	}
+
+	return t.token, nil
+}
+
+// InvalidateToken clears the access token (likely due to a response from the resource server).
+// Note that the token within the grant may contain a refresh token which should survive.
+func (t *tokenCache) InvalidateToken() error {
+	t.lock.Lock()
+	defer t.lock.Unlock()
+
+	previous := t.token
+	t.token = nil
+
+	// clear from the store the access token that was returned earlier (unless the store has since been updated)
+	if previous == nil || previous.AccessToken == "" {
+		return nil
+	}
+	grant, err := t.store.LoadGrant(t.audience)
+	if err != nil {
+		return fmt.Errorf("LoadGrant: %v", err)
+	}
+	if grant.Token != nil && grant.Token.AccessToken == previous.AccessToken {
+		grant.Token.Expiry = time.Unix(0, 0).Add(expiryDelta)
+		err = t.store.SaveGrant(t.audience, *grant)
+		if err != nil {
+			// TODO log rather than throw
+			return fmt.Errorf("SaveGrant: %v", err)
+		}
+	}
+	return nil
+}
+
+// validateAccessToken checks the validity of the cached access token
+func (t *tokenCache) validateAccessToken(token xoauth2.Token) bool {
+	if token.AccessToken == "" {
+		return false
+	}
+	if !token.Expiry.IsZero() && t.clock.Now().After(token.Expiry.Round(0).Add(-expiryDelta)) {
+		return false
+	}
+	return true
+}
diff --git a/oauth2/client_credentials_flow.go b/oauth2/client_credentials_flow.go
new file mode 100644
index 0000000..808b09b
--- /dev/null
+++ b/oauth2/client_credentials_flow.go
@@ -0,0 +1,158 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package oauth2
+
+import (
+	"net/http"
+
+	"github.com/apache/pulsar-client-go/oauth2/clock"
+
+	"github.com/pkg/errors"
+)
+
+// ClientCredentialsFlow takes care of the mechanics needed for getting an access
+// token using the OAuth 2.0 "Client Credentials Flow"
+type ClientCredentialsFlow struct {
+	options                ClientCredentialsFlowOptions
+	oidcWellKnownEndpoints OIDCWellKnownEndpoints
+	keyfile                *KeyFile
+	exchanger              ClientCredentialsExchanger
+	clock                  clock.Clock
+}
+
+// ClientCredentialsProvider abstracts getting client credentials
+type ClientCredentialsProvider interface {
+	GetClientCredentials() (*KeyFile, error)
+}
+
+// ClientCredentialsExchanger abstracts exchanging client credentials for tokens
+type ClientCredentialsExchanger interface {
+	ExchangeClientCredentials(req ClientCredentialsExchangeRequest) (*TokenResult, error)
+}
+
+type ClientCredentialsFlowOptions struct {
+	KeyFile          string
+	AdditionalScopes []string
+}
+
+func newClientCredentialsFlow(
+	options ClientCredentialsFlowOptions,
+	keyfile *KeyFile,
+	oidcWellKnownEndpoints OIDCWellKnownEndpoints,
+	exchanger ClientCredentialsExchanger,
+	clock clock.Clock) *ClientCredentialsFlow {
+	return &ClientCredentialsFlow{
+		options:                options,
+		oidcWellKnownEndpoints: oidcWellKnownEndpoints,
+		keyfile:                keyfile,
+		exchanger:              exchanger,
+		clock:                  clock,
+	}
+}
+
+// NewDefaultClientCredentialsFlow provides an easy way to build up a default
+// client credentials flow with all the correct configuration.
+func NewDefaultClientCredentialsFlow(options ClientCredentialsFlowOptions) (*ClientCredentialsFlow, error) {
+
+	credsProvider := NewClientCredentialsProviderFromKeyFile(options.KeyFile)
+	keyFile, err := credsProvider.GetClientCredentials()
+	if err != nil {
+		return nil, errors.Wrap(err, "could not get client credentials")
+	}
+
+	wellKnownEndpoints, err := GetOIDCWellKnownEndpointsFromIssuerURL(keyFile.IssuerURL)
+	if err != nil {
+		return nil, err
+	}
+
+	tokenRetriever := NewTokenRetriever(&http.Client{})
+
+	return newClientCredentialsFlow(
+		options,
+		keyFile,
+		*wellKnownEndpoints,
+		tokenRetriever,
+		clock.RealClock{}), nil
+}
+
+var _ Flow = &ClientCredentialsFlow{}
+
+func (c *ClientCredentialsFlow) Authorize(audience string) (*AuthorizationGrant, error) {
+	var err error
+	grant := &AuthorizationGrant{
+		Type:              GrantTypeClientCredentials,
+		Audience:          audience,
+		ClientID:          c.keyfile.ClientID,
+		ClientCredentials: c.keyfile,
+		TokenEndpoint:     c.oidcWellKnownEndpoints.TokenEndpoint,
+	}
+
+	// test the credentials and obtain an initial access token
+	refresher := &ClientCredentialsGrantRefresher{
+		exchanger: c.exchanger,
+		clock:     c.clock,
+	}
+	grant, err = refresher.Refresh(grant)
+	if err != nil {
+		return nil, errors.Wrap(err, "authentication failed using client credentials")
+	}
+	return grant, nil
+}
+
+type ClientCredentialsGrantRefresher struct {
+	exchanger ClientCredentialsExchanger
+	clock     clock.Clock
+}
+
+func NewDefaultClientCredentialsGrantRefresher(clock clock.Clock) (*ClientCredentialsGrantRefresher, error) {
+	tokenRetriever := NewTokenRetriever(&http.Client{})
+	return &ClientCredentialsGrantRefresher{
+		exchanger: tokenRetriever,
+		clock:     clock,
+	}, nil
+}
+
+var _ AuthorizationGrantRefresher = &ClientCredentialsGrantRefresher{}
+
+func (g *ClientCredentialsGrantRefresher) Refresh(grant *AuthorizationGrant) (*AuthorizationGrant, error) {
+	if grant.Type != GrantTypeClientCredentials {
+		return nil, errors.New("unsupported grant type")
+	}
+
+	exchangeRequest := ClientCredentialsExchangeRequest{
+		TokenEndpoint: grant.TokenEndpoint,
+		Audience:      grant.Audience,
+		ClientID:      grant.ClientCredentials.ClientID,
+		ClientSecret:  grant.ClientCredentials.ClientSecret,
+	}
+	tr, err := g.exchanger.ExchangeClientCredentials(exchangeRequest)
+	if err != nil {
+		return nil, errors.Wrap(err, "could not exchange client credentials")
+	}
+
+	token := convertToOAuth2Token(tr, g.clock)
+	grant = &AuthorizationGrant{
+		Type:              GrantTypeClientCredentials,
+		Audience:          grant.Audience,
+		ClientID:          grant.ClientID,
+		ClientCredentials: grant.ClientCredentials,
+		TokenEndpoint:     grant.TokenEndpoint,
+		Token:             &token,
+	}
+	return grant, nil
+}
diff --git a/oauth2/client_credentials_flow_test.go b/oauth2/client_credentials_flow_test.go
new file mode 100644
index 0000000..6e123ce
--- /dev/null
+++ b/oauth2/client_credentials_flow_test.go
@@ -0,0 +1,183 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package oauth2
+
+import (
+	"errors"
+	"time"
+
+	"github.com/apache/pulsar-client-go/oauth2/clock"
+	"github.com/apache/pulsar-client-go/oauth2/clock/testing"
+
+	. "github.com/onsi/ginkgo"
+	. "github.com/onsi/gomega"
+)
+
+type MockClientCredentialsProvider struct {
+	Called                  bool
+	ClientCredentialsResult *KeyFile
+	ReturnsError            error
+}
+
+func (m *MockClientCredentialsProvider) GetClientCredentials() (*KeyFile, error) {
+	m.Called = true
+	return m.ClientCredentialsResult, m.ReturnsError
+}
+
+var _ ClientCredentialsProvider = &MockClientCredentialsProvider{}
+
+var clientCredentials = KeyFile{
+	Type:         KeyFileTypeServiceAccount,
+	ClientID:     "test_clientID",
+	ClientSecret: "test_clientSecret",
+	ClientEmail:  "test_clientEmail",
+	IssuerURL:    "http://issuer",
+}
+
+var _ = Describe("ClientCredentialsFlow", func() {
+	Describe("Authorize", func() {
+
+		var mockClock clock.Clock
+		var mockTokenExchanger *MockTokenExchanger
+
+		BeforeEach(func() {
+			mockClock = testing.NewFakeClock(time.Unix(0, 0))
+			expectedTokens := TokenResult{AccessToken: "accessToken", RefreshToken: "refreshToken", ExpiresIn: 1234}
+			mockTokenExchanger = &MockTokenExchanger{
+				ReturnsTokens: &expectedTokens,
+			}
+		})
+
+		It("invokes TokenExchanger with credentials", func() {
+			provider := newClientCredentialsFlow(
+				ClientCredentialsFlowOptions{
+					KeyFile: "test_keyfile",
+				},
+				&clientCredentials,
+				oidcEndpoints,
+				mockTokenExchanger,
+				mockClock,
+			)
+
+			_, err := provider.Authorize("test_audience")
+			Expect(err).ToNot(HaveOccurred())
+			Expect(mockTokenExchanger.CalledWithRequest).To(Equal(&ClientCredentialsExchangeRequest{
+				TokenEndpoint: oidcEndpoints.TokenEndpoint,
+				ClientID:      clientCredentials.ClientID,
+				ClientSecret:  clientCredentials.ClientSecret,
+				Audience:      "test_audience",
+			}))
+		})
+
+		It("returns TokensResult from TokenExchanger", func() {
+			provider := newClientCredentialsFlow(
+				ClientCredentialsFlowOptions{
+					KeyFile: "test_keyfile",
+				},
+				&clientCredentials,
+				oidcEndpoints,
+				mockTokenExchanger,
+				mockClock,
+			)
+
+			grant, err := provider.Authorize("test_audience")
+			Expect(err).ToNot(HaveOccurred())
+			expected := convertToOAuth2Token(mockTokenExchanger.ReturnsTokens, mockClock)
+			Expect(*grant.Token).To(Equal(expected))
+		})
+
+		It("returns an error if token exchanger errors", func() {
+			mockTokenExchanger.ReturnsError = errors.New("someerror")
+			mockTokenExchanger.ReturnsTokens = nil
+
+			provider := newClientCredentialsFlow(
+				ClientCredentialsFlowOptions{
+					KeyFile: "test_keyfile",
+				},
+				&clientCredentials,
+				oidcEndpoints,
+				mockTokenExchanger,
+				mockClock,
+			)
+
+			_, err := provider.Authorize("test_audience")
+			Expect(err.Error()).To(Equal("authentication failed using client credentials: " +
+				"could not exchange client credentials: someerror"))
+		})
+	})
+})
+
+var _ = Describe("ClientCredentialsGrantRefresher", func() {
+
+	Describe("Refresh", func() {
+		var mockClock clock.Clock
+		var mockTokenExchanger *MockTokenExchanger
+
+		BeforeEach(func() {
+			mockClock = testing.NewFakeClock(time.Unix(0, 0))
+			expectedTokens := TokenResult{AccessToken: "accessToken", RefreshToken: "refreshToken", ExpiresIn: 1234}
+			mockTokenExchanger = &MockTokenExchanger{
+				ReturnsTokens: &expectedTokens,
+			}
+		})
+
+		It("invokes TokenExchanger with credentials", func() {
+			refresher := &ClientCredentialsGrantRefresher{
+				clock:     mockClock,
+				exchanger: mockTokenExchanger,
+			}
+			og := &AuthorizationGrant{
+				Type:              GrantTypeClientCredentials,
+				Audience:          "test_audience",
+				ClientCredentials: &clientCredentials,
+				TokenEndpoint:     oidcEndpoints.TokenEndpoint,
+				Token:             nil,
+			}
+			_, err := refresher.Refresh(og)
+			Expect(err).ToNot(HaveOccurred())
+			Expect(mockTokenExchanger.CalledWithRequest).To(Equal(&ClientCredentialsExchangeRequest{
+				TokenEndpoint: oidcEndpoints.TokenEndpoint,
+				ClientID:      clientCredentials.ClientID,
+				ClientSecret:  clientCredentials.ClientSecret,
+				Audience:      og.Audience,
+			}))
+		})
+
+		It("returns a valid grant", func() {
+			refresher := &ClientCredentialsGrantRefresher{
+				clock:     mockClock,
+				exchanger: mockTokenExchanger,
+			}
+			og := &AuthorizationGrant{
+				Type:              GrantTypeClientCredentials,
+				Audience:          "test_audience",
+				ClientCredentials: &clientCredentials,
+				TokenEndpoint:     oidcEndpoints.TokenEndpoint,
+				Token:             nil,
+			}
+			ng, err := refresher.Refresh(og)
+			Expect(err).ToNot(HaveOccurred())
+			Expect(ng.Audience).To(Equal("test_audience"))
+			Expect(ng.ClientID).To(Equal(""))
+			Expect(*ng.ClientCredentials).To(Equal(clientCredentials))
+			Expect(ng.TokenEndpoint).To(Equal(oidcEndpoints.TokenEndpoint))
+			expected := convertToOAuth2Token(mockTokenExchanger.ReturnsTokens, mockClock)
+			Expect(*ng.Token).To(Equal(expected))
+		})
+	})
+})
diff --git a/oauth2/client_credentials_provider.go b/oauth2/client_credentials_provider.go
new file mode 100644
index 0000000..2716376
--- /dev/null
+++ b/oauth2/client_credentials_provider.go
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package oauth2
+
+import (
+	"encoding/json"
+	"fmt"
+	"io/ioutil"
+)
+
+const (
+	KeyFileTypeServiceAccount = "sn_service_account"
+)
+
+type KeyFileProvider struct {
+	KeyFile string
+}
+
+type KeyFile struct {
+	Type         string `json:"type"`
+	ClientID     string `json:"client_id"`
+	ClientSecret string `json:"client_secret"`
+	ClientEmail  string `json:"client_email"`
+	IssuerURL    string `json:"issuer_url"`
+}
+
+func NewClientCredentialsProviderFromKeyFile(keyFile string) *KeyFileProvider {
+	return &KeyFileProvider{
+		KeyFile: keyFile,
+	}
+}
+
+var _ ClientCredentialsProvider = &KeyFileProvider{}
+
+func (k *KeyFileProvider) GetClientCredentials() (*KeyFile, error) {
+	keyFile, err := ioutil.ReadFile(k.KeyFile)
+	if err != nil {
+		return nil, err
+	}
+
+	var v KeyFile
+	err = json.Unmarshal(keyFile, &v)
+	if err != nil {
+		return nil, err
+	}
+	if v.Type != KeyFileTypeServiceAccount {
+		return nil, fmt.Errorf("open %s: unsupported format", k.KeyFile)
+	}
+
+	return &v, nil
+}
diff --git a/oauth2/clock/clock.go b/oauth2/clock/clock.go
new file mode 100644
index 0000000..f170d9a
--- /dev/null
+++ b/oauth2/clock/clock.go
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package clock
+
+import "time"
+
+// Clock allows for injecting fake or real clocks into code that
+// needs to do arbitrary things based on time.
+type Clock interface {
+	Now() time.Time
+	Since(time.Time) time.Duration
+	After(d time.Duration) <-chan time.Time
+	NewTimer(d time.Duration) Timer
+	Sleep(d time.Duration)
+	Tick(d time.Duration) <-chan time.Time
+}
+
+var _ = Clock(RealClock{})
+
+// RealClock really calls time.Now()
+type RealClock struct{}
+
+// Now returns the current time.
+func (RealClock) Now() time.Time {
+	return time.Now()
+}
+
+// Since returns time since the specified timestamp.
+func (RealClock) Since(ts time.Time) time.Duration {
+	return time.Since(ts)
+}
+
+// After is the same as time.After(d).
+func (RealClock) After(d time.Duration) <-chan time.Time {
+	return time.After(d)
+}
+
+// NewTimer is the same as time.NewTimer(d)
+func (RealClock) NewTimer(d time.Duration) Timer {
+	return &realTimer{
+		timer: time.NewTimer(d),
+	}
+}
+
+// Tick is the same as time.Tick(d)
+func (RealClock) Tick(d time.Duration) <-chan time.Time {
+	return time.Tick(d)
+}
+
+// Sleep is the same as time.Sleep(d)
+func (RealClock) Sleep(d time.Duration) {
+	time.Sleep(d)
+}
+
+// Timer allows for injecting fake or real timers into code that
+// needs to do arbitrary things based on time.
+type Timer interface {
+	C() <-chan time.Time
+	Stop() bool
+	Reset(d time.Duration) bool
+}
+
+var _ = Timer(&realTimer{})
+
+// realTimer is backed by an actual time.Timer.
+type realTimer struct {
+	timer *time.Timer
+}
+
+// C returns the underlying timer's channel.
+func (r *realTimer) C() <-chan time.Time {
+	return r.timer.C
+}
+
+// Stop calls Stop() on the underlying timer.
+func (r *realTimer) Stop() bool {
+	return r.timer.Stop()
+}
+
+// Reset calls Reset() on the underlying timer.
+func (r *realTimer) Reset(d time.Duration) bool {
+	return r.timer.Reset(d)
+}
diff --git a/oauth2/clock/testing/fake_clock.go b/oauth2/clock/testing/fake_clock.go
new file mode 100644
index 0000000..6fcbf4c
--- /dev/null
+++ b/oauth2/clock/testing/fake_clock.go
@@ -0,0 +1,275 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package testing
+
+import (
+	"sync"
+	"time"
+
+	"github.com/apache/pulsar-client-go/oauth2/clock"
+)
+
+var (
+	_ = clock.Clock(&FakeClock{})
+	_ = clock.Clock(&IntervalClock{})
+)
+
+// FakeClock implements clock.Clock, but returns an arbitrary time.
+type FakeClock struct {
+	lock sync.RWMutex
+	time time.Time
+
+	// waiters are waiting for the fake time to pass their specified time
+	waiters []*fakeClockWaiter
+}
+
+type fakeClockWaiter struct {
+	targetTime    time.Time
+	stepInterval  time.Duration
+	skipIfBlocked bool
+	destChan      chan time.Time
+	fired         bool
+}
+
+// NewFakeClock constructs a fake clock set to the provided time.
+func NewFakeClock(t time.Time) *FakeClock {
+	return &FakeClock{
+		time: t,
+	}
+}
+
+// Now returns f's time.
+func (f *FakeClock) Now() time.Time {
+	f.lock.RLock()
+	defer f.lock.RUnlock()
+	return f.time
+}
+
+// Since returns time since the time in f.
+func (f *FakeClock) Since(ts time.Time) time.Duration {
+	f.lock.RLock()
+	defer f.lock.RUnlock()
+	return f.time.Sub(ts)
+}
+
+// After is the fake version of time.After(d).
+func (f *FakeClock) After(d time.Duration) <-chan time.Time {
+	f.lock.Lock()
+	defer f.lock.Unlock()
+	stopTime := f.time.Add(d)
+	ch := make(chan time.Time, 1) // Don't block!
+	f.waiters = append(f.waiters, &fakeClockWaiter{
+		targetTime: stopTime,
+		destChan:   ch,
+	})
+	return ch
+}
+
+// NewTimer constructs a fake timer, akin to time.NewTimer(d).
+func (f *FakeClock) NewTimer(d time.Duration) clock.Timer {
+	f.lock.Lock()
+	defer f.lock.Unlock()
+	stopTime := f.time.Add(d)
+	ch := make(chan time.Time, 1) // Don't block!
+	timer := &fakeTimer{
+		fakeClock: f,
+		waiter: fakeClockWaiter{
+			targetTime: stopTime,
+			destChan:   ch,
+		},
+	}
+	f.waiters = append(f.waiters, &timer.waiter)
+	return timer
+}
+
+// Tick constructs a fake ticker, akin to time.Tick
+func (f *FakeClock) Tick(d time.Duration) <-chan time.Time {
+	if d <= 0 {
+		return nil
+	}
+	f.lock.Lock()
+	defer f.lock.Unlock()
+	tickTime := f.time.Add(d)
+	ch := make(chan time.Time, 1) // hold one tick
+	f.waiters = append(f.waiters, &fakeClockWaiter{
+		targetTime:    tickTime,
+		stepInterval:  d,
+		skipIfBlocked: true,
+		destChan:      ch,
+	})
+
+	return ch
+}
+
+// Step moves the clock by Duration and notifies anyone that's called After,
+// Tick, or NewTimer.
+func (f *FakeClock) Step(d time.Duration) {
+	f.lock.Lock()
+	defer f.lock.Unlock()
+	f.setTimeLocked(f.time.Add(d))
+}
+
+// SetTime sets the time.
+func (f *FakeClock) SetTime(t time.Time) {
+	f.lock.Lock()
+	defer f.lock.Unlock()
+	f.setTimeLocked(t)
+}
+
+// Actually changes the time and checks any waiters. f must be write-locked.
+func (f *FakeClock) setTimeLocked(t time.Time) {
+	f.time = t
+	newWaiters := make([]*fakeClockWaiter, 0, len(f.waiters))
+	for i := range f.waiters {
+		w := f.waiters[i]
+		if !w.targetTime.After(t) {
+
+			if w.skipIfBlocked {
+				select {
+				case w.destChan <- t:
+					w.fired = true
+				default:
+				}
+			} else {
+				w.destChan <- t
+				w.fired = true
+			}
+
+			if w.stepInterval > 0 {
+				for !w.targetTime.After(t) {
+					w.targetTime = w.targetTime.Add(w.stepInterval)
+				}
+				newWaiters = append(newWaiters, w)
+			}
+
+		} else {
+			newWaiters = append(newWaiters, f.waiters[i])
+		}
+	}
+	f.waiters = newWaiters
+}
+
+// HasWaiters returns true if After has been called on f but not yet satisfied (so you can
+// write race-free tests).
+func (f *FakeClock) HasWaiters() bool {
+	f.lock.RLock()
+	defer f.lock.RUnlock()
+	return len(f.waiters) > 0
+}
+
+// Sleep is akin to time.Sleep
+func (f *FakeClock) Sleep(d time.Duration) {
+	f.Step(d)
+}
+
+// IntervalClock implements clock.Clock, but each invocation of Now steps the clock forward the specified duration
+type IntervalClock struct {
+	Time     time.Time
+	Duration time.Duration
+}
+
+// Now returns i's time.
+func (i *IntervalClock) Now() time.Time {
+	i.Time = i.Time.Add(i.Duration)
+	return i.Time
+}
+
+// Since returns time since the time in i.
+func (i *IntervalClock) Since(ts time.Time) time.Duration {
+	return i.Time.Sub(ts)
+}
+
+// After is unimplemented, will panic.
+// TODO: make interval clock use FakeClock so this can be implemented.
+func (*IntervalClock) After(d time.Duration) <-chan time.Time {
+	panic("IntervalClock doesn't implement After")
+}
+
+// NewTimer is unimplemented, will panic.
+// TODO: make interval clock use FakeClock so this can be implemented.
+func (*IntervalClock) NewTimer(d time.Duration) clock.Timer {
+	panic("IntervalClock doesn't implement NewTimer")
+}
+
+// Tick is unimplemented, will panic.
+// TODO: make interval clock use FakeClock so this can be implemented.
+func (*IntervalClock) Tick(d time.Duration) <-chan time.Time {
+	panic("IntervalClock doesn't implement Tick")
+}
+
+// Sleep is unimplemented, will panic.
+func (*IntervalClock) Sleep(d time.Duration) {
+	panic("IntervalClock doesn't implement Sleep")
+}
+
+var _ = clock.Timer(&fakeTimer{})
+
+// fakeTimer implements clock.Timer based on a FakeClock.
+type fakeTimer struct {
+	fakeClock *FakeClock
+	waiter    fakeClockWaiter
+}
+
+// C returns the channel that notifies when this timer has fired.
+func (f *fakeTimer) C() <-chan time.Time {
+	return f.waiter.destChan
+}
+
+// Stop stops the timer and returns true if the timer has not yet fired, or false otherwise.
+func (f *fakeTimer) Stop() bool {
+	f.fakeClock.lock.Lock()
+	defer f.fakeClock.lock.Unlock()
+
+	newWaiters := make([]*fakeClockWaiter, 0, len(f.fakeClock.waiters))
+	for i := range f.fakeClock.waiters {
+		w := f.fakeClock.waiters[i]
+		if w != &f.waiter {
+			newWaiters = append(newWaiters, w)
+		}
+	}
+
+	f.fakeClock.waiters = newWaiters
+
+	return !f.waiter.fired
+}
+
+// Reset resets the timer to the fake clock's "now" + d. It returns true if the timer has not yet
+// fired, or false otherwise.
+func (f *fakeTimer) Reset(d time.Duration) bool {
+	f.fakeClock.lock.Lock()
+	defer f.fakeClock.lock.Unlock()
+
+	active := !f.waiter.fired
+
+	f.waiter.fired = false
+	f.waiter.targetTime = f.fakeClock.time.Add(d)
+
+	var isWaiting bool
+	for i := range f.fakeClock.waiters {
+		w := f.fakeClock.waiters[i]
+		if w == &f.waiter {
+			isWaiting = true
+			break
+		}
+	}
+	if !isWaiting {
+		f.fakeClock.waiters = append(f.fakeClock.waiters, &f.waiter)
+	}
+
+	return active
+}
diff --git a/oauth2/config_tokenprovider.go b/oauth2/config_tokenprovider.go
new file mode 100644
index 0000000..627749f
--- /dev/null
+++ b/oauth2/config_tokenprovider.go
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package oauth2
+
+import "fmt"
+
+type configProvider interface {
+	GetTokens(identifier string) (string, string)
+	SaveTokens(identifier, accessToken, refreshToken string)
+}
+
+// ConfigBackedCachingProvider wraps a configProvider in order to conform to
+// the cachingProvider interface
+type ConfigBackedCachingProvider struct {
+	identifier string
+	config     configProvider
+}
+
+// NewConfigBackedCachingProvider builds and returns a CachingTokenProvider
+// that utilizes a configProvider to cache tokens
+func NewConfigBackedCachingProvider(clientID, audience string, config configProvider) *ConfigBackedCachingProvider {
+	return &ConfigBackedCachingProvider{
+		identifier: fmt.Sprintf("%s-%s", clientID, audience),
+		config:     config,
+	}
+}
+
+// GetTokens gets the tokens from the cache and returns them as a TokenResult
+func (c *ConfigBackedCachingProvider) GetTokens() (*TokenResult, error) {
+	accessToken, refreshToken := c.config.GetTokens(c.identifier)
+	return &TokenResult{
+		AccessToken:  accessToken,
+		RefreshToken: refreshToken,
+	}, nil
+}
+
+// CacheTokens caches the id and refresh token from TokenResult in the
+// configProvider
+func (c *ConfigBackedCachingProvider) CacheTokens(toCache *TokenResult) error {
+	c.config.SaveTokens(c.identifier, toCache.AccessToken, toCache.RefreshToken)
+	return nil
+}
diff --git a/oauth2/config_tokenprovider_test.go b/oauth2/config_tokenprovider_test.go
new file mode 100644
index 0000000..d949a5a
--- /dev/null
+++ b/oauth2/config_tokenprovider_test.go
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package oauth2
+
+import (
+	. "github.com/onsi/ginkgo"
+	. "github.com/onsi/gomega"
+)
+
+type mockConfigProvider struct {
+	ReturnAccessToken         string
+	ReturnRefreshToken        string
+	GetTokensCalledIdentifier string
+	SavedIdentifier           string
+	SavedAccessToken          string
+	SavedRefreshToken         string
+}
+
+func (m *mockConfigProvider) GetTokens(identifier string) (string, string) {
+	m.GetTokensCalledIdentifier = identifier
+	return m.ReturnAccessToken, m.ReturnRefreshToken
+}
+
+func (m *mockConfigProvider) SaveTokens(identifier, accessToken, refreshToken string) {
+	m.SavedIdentifier = identifier
+	m.SavedAccessToken = accessToken
+	m.SavedRefreshToken = refreshToken
+}
+
+var _ = Describe("main", func() {
+	Describe("configCachingProvider", func() {
+		It("sets up the identifier using the clientID and audience", func() {
+			p := NewConfigBackedCachingProvider("iamclientid", "iamaudience", &mockConfigProvider{})
+
+			Expect(p.identifier).To(Equal("iamclientid-iamaudience"))
+		})
+
+		It("gets tokens from the config provider", func() {
+			c := &mockConfigProvider{
+				ReturnAccessToken:  "accessToken",
+				ReturnRefreshToken: "refreshToken",
+			}
+			p := ConfigBackedCachingProvider{
+				identifier: "iamidentifier",
+				config:     c,
+			}
+
+			r, err := p.GetTokens()
+
+			Expect(err).NotTo(HaveOccurred())
+			Expect(c.GetTokensCalledIdentifier).To(Equal(p.identifier))
+			Expect(r).To(Equal(&TokenResult{
+				AccessToken:  c.ReturnAccessToken,
+				RefreshToken: c.ReturnRefreshToken,
+			}))
+		})
+
+		It("caches the tokens in the config provider", func() {
+			c := &mockConfigProvider{}
+			p := ConfigBackedCachingProvider{
+				identifier: "iamidentifier",
+				config:     c,
+			}
+			toSave := &TokenResult{
+				AccessToken:  "accessToken",
+				RefreshToken: "refreshToken",
+			}
+
+			p.CacheTokens(toSave)
+
+			Expect(c.SavedIdentifier).To(Equal(p.identifier))
+			Expect(c.SavedAccessToken).To(Equal(toSave.AccessToken))
+			Expect(c.SavedRefreshToken).To(Equal(toSave.RefreshToken))
+		})
+	})
+})
diff --git a/oauth2/device_code_flow.go b/oauth2/device_code_flow.go
new file mode 100644
index 0000000..486fdfa
--- /dev/null
+++ b/oauth2/device_code_flow.go
@@ -0,0 +1,203 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package oauth2
+
+import (
+	"context"
+	"fmt"
+	"net/http"
+	"time"
+
+	"github.com/apache/pulsar-client-go/oauth2/clock"
+
+	"github.com/pkg/errors"
+)
+
+// DeviceCodeFlow takes care of the mechanics needed for getting an access
+// token using the OAuth 2.0 "Device Code Flow"
+type DeviceCodeFlow struct {
+	options                DeviceCodeFlowOptions
+	oidcWellKnownEndpoints OIDCWellKnownEndpoints
+	codeProvider           DeviceCodeProvider
+	exchanger              DeviceTokenExchanger
+	callback               DeviceCodeCallback
+	clock                  clock.Clock
+}
+
+// AuthorizationCodeProvider abstracts getting an authorization code
+type DeviceCodeProvider interface {
+	GetCode(audience string, additionalScopes ...string) (*DeviceCodeResult, error)
+}
+
+// DeviceTokenExchanger abstracts exchanging for tokens
+type DeviceTokenExchanger interface {
+	ExchangeDeviceCode(ctx context.Context, req DeviceCodeExchangeRequest) (*TokenResult, error)
+	ExchangeRefreshToken(req RefreshTokenExchangeRequest) (*TokenResult, error)
+}
+
+type DeviceCodeCallback func(code *DeviceCodeResult) error
+
+type DeviceCodeFlowOptions struct {
+	IssuerEndpoint   string
+	ClientID         string
+	AdditionalScopes []string
+	AllowRefresh     bool
+}
+
+func newDeviceCodeFlow(
+	options DeviceCodeFlowOptions,
+	oidcWellKnownEndpoints OIDCWellKnownEndpoints,
+	codeProvider DeviceCodeProvider,
+	exchanger DeviceTokenExchanger,
+	callback DeviceCodeCallback,
+	clock clock.Clock) *DeviceCodeFlow {
+	return &DeviceCodeFlow{
+		options:                options,
+		oidcWellKnownEndpoints: oidcWellKnownEndpoints,
+		codeProvider:           codeProvider,
+		exchanger:              exchanger,
+		callback:               callback,
+		clock:                  clock,
+	}
+}
+
+// NewDefaultDeviceCodeFlow provides an easy way to build up a default
+// device code flow with all the correct configuration. If refresh tokens should
+// be allowed pass in true for <allowRefresh>
+func NewDefaultDeviceCodeFlow(options DeviceCodeFlowOptions,
+	callback DeviceCodeCallback) (*DeviceCodeFlow, error) {
+	wellKnownEndpoints, err := GetOIDCWellKnownEndpointsFromIssuerURL(options.IssuerEndpoint)
+	if err != nil {
+		return nil, err
+	}
+
+	codeProvider := NewLocalDeviceCodeProvider(
+		LocalDeviceCodeProviderOptions{
+			ClientID: options.ClientID,
+		},
+		*wellKnownEndpoints,
+		&http.Client{},
+	)
+
+	tokenRetriever := NewTokenRetriever(&http.Client{})
+
+	return newDeviceCodeFlow(
+		options,
+		*wellKnownEndpoints,
+		codeProvider,
+		tokenRetriever,
+		callback,
+		clock.RealClock{}), nil
+}
+
+var _ Flow = &DeviceCodeFlow{}
+
+func (p *DeviceCodeFlow) Authorize(audience string) (*AuthorizationGrant, error) {
+
+	var additionalScopes []string
+	additionalScopes = append(additionalScopes, p.options.AdditionalScopes...)
+	if p.options.AllowRefresh {
+		additionalScopes = append(additionalScopes, "offline_access")
+	}
+
+	codeResult, err := p.codeProvider.GetCode(audience, additionalScopes...)
+	if err != nil {
+		return nil, err
+	}
+
+	if p.callback != nil {
+		err := p.callback(codeResult)
+		if err != nil {
+			return nil, err
+		}
+	}
+
+	exchangeRequest := DeviceCodeExchangeRequest{
+		TokenEndpoint: p.oidcWellKnownEndpoints.TokenEndpoint,
+		ClientID:      p.options.ClientID,
+		DeviceCode:    codeResult.DeviceCode,
+		PollInterval:  time.Duration(codeResult.Interval) * time.Second,
+	}
+
+	tr, err := p.exchanger.ExchangeDeviceCode(context.Background(), exchangeRequest)
+	if err != nil {
+		return nil, errors.Wrap(err, "could not exchange code")
+	}
+
+	token := convertToOAuth2Token(tr, p.clock)
+	grant := &AuthorizationGrant{
+		Type:          GrantTypeDeviceCode,
+		Audience:      audience,
+		ClientID:      p.options.ClientID,
+		TokenEndpoint: p.oidcWellKnownEndpoints.TokenEndpoint,
+		Token:         &token,
+	}
+	return grant, nil
+}
+
+type DeviceAuthorizationGrantRefresher struct {
+	exchanger DeviceTokenExchanger
+	clock     clock.Clock
+}
+
+// NewDefaultDeviceAuthorizationGrantRefresher constructs a grant refresher based on the result
+// of the device authorization flow.
+func NewDefaultDeviceAuthorizationGrantRefresher(clock clock.Clock) (*DeviceAuthorizationGrantRefresher, error) {
+	tokenRetriever := NewTokenRetriever(&http.Client{})
+	return &DeviceAuthorizationGrantRefresher{
+		exchanger: tokenRetriever,
+		clock:     clock,
+	}, nil
+}
+
+var _ AuthorizationGrantRefresher = &DeviceAuthorizationGrantRefresher{}
+
+func (g *DeviceAuthorizationGrantRefresher) Refresh(grant *AuthorizationGrant) (*AuthorizationGrant, error) {
+	if grant.Type != GrantTypeDeviceCode {
+		return nil, errors.New("unsupported grant type")
+	}
+	if grant.Token == nil || grant.Token.RefreshToken == "" {
+		return nil, fmt.Errorf("the authorization grant has expired (no refresh token); please re-login")
+	}
+
+	exchangeRequest := RefreshTokenExchangeRequest{
+		TokenEndpoint: grant.TokenEndpoint,
+		ClientID:      grant.ClientID,
+		RefreshToken:  grant.Token.RefreshToken,
+	}
+	tr, err := g.exchanger.ExchangeRefreshToken(exchangeRequest)
+	if err != nil {
+		return nil, errors.Wrap(err, "could not exchange refresh token")
+	}
+
+	// RFC 6749 Section 1.5 - token exchange MAY issue a new refresh token (otherwise the result is blank).
+	// also see: https://tools.ietf.org/html/draft-ietf-oauth-security-topics-13#section-4.12
+	if tr.RefreshToken == "" {
+		tr.RefreshToken = grant.Token.RefreshToken
+	}
+
+	token := convertToOAuth2Token(tr, g.clock)
+	grant = &AuthorizationGrant{
+		Type:          GrantTypeDeviceCode,
+		Audience:      grant.Audience,
+		ClientID:      grant.ClientID,
+		Token:         &token,
+		TokenEndpoint: grant.TokenEndpoint,
+	}
+	return grant, nil
+}
diff --git a/oauth2/device_code_flow_test.go b/oauth2/device_code_flow_test.go
new file mode 100644
index 0000000..a238a48
--- /dev/null
+++ b/oauth2/device_code_flow_test.go
@@ -0,0 +1,230 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package oauth2
+
+import (
+	"errors"
+	"time"
+
+	"github.com/apache/pulsar-client-go/oauth2/clock"
+	"github.com/apache/pulsar-client-go/oauth2/clock/testing"
+	"golang.org/x/oauth2"
+
+	. "github.com/onsi/ginkgo"
+	. "github.com/onsi/gomega"
+)
+
+type MockDeviceCodeProvider struct {
+	Called                     bool
+	CalledWithAudience         string
+	CalledWithAdditionalScopes []string
+	DeviceCodeResult           *DeviceCodeResult
+	ReturnsError               error
+}
+
+func (cp *MockDeviceCodeProvider) GetCode(audience string, additionalScopes ...string) (*DeviceCodeResult, error) {
+	cp.Called = true
+	cp.CalledWithAudience = audience
+	cp.CalledWithAdditionalScopes = additionalScopes
+	return cp.DeviceCodeResult, cp.ReturnsError
+}
+
+type MockDeviceCodeCallback struct {
+	Called           bool
+	DeviceCodeResult *DeviceCodeResult
+	ReturnsError     error
+}
+
+func (c *MockDeviceCodeCallback) Callback(code *DeviceCodeResult) error {
+	c.Called = true
+	c.DeviceCodeResult = code
+	if c.ReturnsError != nil {
+		return c.ReturnsError
+	}
+	return nil
+}
+
+var _ = Describe("DeviceCodeFlow", func() {
+
+	Describe("Authorize", func() {
+		const audience = "test_clientID"
+
+		var mockClock clock.Clock
+		var mockCodeProvider *MockDeviceCodeProvider
+		var mockTokenExchanger *MockTokenExchanger
+		var mockCallback *MockDeviceCodeCallback
+		var flow *DeviceCodeFlow
+
+		BeforeEach(func() {
+			mockClock = testing.NewFakeClock(time.Unix(0, 0))
+
+			mockCodeProvider = &MockDeviceCodeProvider{
+				DeviceCodeResult: &DeviceCodeResult{
+					DeviceCode:              "test_deviceCode",
+					UserCode:                "test_userCode",
+					VerificationURI:         "http://verification_uri",
+					VerificationURIComplete: "http://verification_uri_complete",
+					ExpiresIn:               10,
+					Interval:                5,
+				},
+			}
+
+			expectedTokens := TokenResult{AccessToken: "accessToken", RefreshToken: "refreshToken", ExpiresIn: 1234}
+			mockTokenExchanger = &MockTokenExchanger{
+				ReturnsTokens: &expectedTokens,
+			}
+
+			mockCallback = &MockDeviceCodeCallback{}
+
+			opts := DeviceCodeFlowOptions{
+				IssuerEndpoint:   "http://issuer",
+				ClientID:         "test_clientID",
+				AdditionalScopes: nil,
+				AllowRefresh:     true,
+			}
+			flow = newDeviceCodeFlow(
+				opts,
+				oidcEndpoints,
+				mockCodeProvider,
+				mockTokenExchanger,
+				mockCallback.Callback,
+				mockClock,
+			)
+		})
+
+		It("invokes DeviceCodeProvider", func() {
+			_, _ = flow.Authorize(audience)
+			Expect(mockCodeProvider.Called).To(BeTrue())
+			Expect(mockCodeProvider.CalledWithAdditionalScopes).To(ContainElement("offline_access"))
+		})
+
+		It("invokes callback with returned code", func() {
+			_, _ = flow.Authorize(audience)
+			Expect(mockCallback.Called).To(BeTrue())
+			Expect(mockCallback.DeviceCodeResult).To(Equal(mockCodeProvider.DeviceCodeResult))
+		})
+
+		It("invokes TokenExchanger with returned code", func() {
+			_, _ = flow.Authorize(audience)
+			Expect(mockTokenExchanger.CalledWithRequest).To(Equal(&DeviceCodeExchangeRequest{
+				TokenEndpoint: oidcEndpoints.TokenEndpoint,
+				ClientID:      "test_clientID",
+				PollInterval:  time.Duration(5) * time.Second,
+				DeviceCode:    "test_deviceCode",
+			}))
+		})
+
+		It("returns an authorization grant", func() {
+			grant, _ := flow.Authorize(audience)
+			Expect(grant).ToNot(BeNil())
+			Expect(grant.Audience).To(Equal(audience))
+			Expect(grant.ClientID).To(Equal("test_clientID"))
+			Expect(grant.ClientCredentials).To(BeNil())
+			Expect(grant.TokenEndpoint).To(Equal(oidcEndpoints.TokenEndpoint))
+			expected := convertToOAuth2Token(mockTokenExchanger.ReturnsTokens, mockClock)
+			Expect(*grant.Token).To(Equal(expected))
+		})
+	})
+})
+
+var _ = Describe("DeviceAuthorizationGrantRefresher", func() {
+
+	Describe("Refresh", func() {
+		var mockClock clock.Clock
+		var mockTokenExchanger *MockTokenExchanger
+		var refresher *DeviceAuthorizationGrantRefresher
+		var grant *AuthorizationGrant
+
+		BeforeEach(func() {
+			mockClock = testing.NewFakeClock(time.Unix(0, 0))
+
+			mockTokenExchanger = &MockTokenExchanger{}
+
+			refresher = &DeviceAuthorizationGrantRefresher{
+				exchanger: mockTokenExchanger,
+				clock:     mockClock,
+			}
+
+			token := oauth2.Token{AccessToken: "gat", RefreshToken: "grt", Expiry: time.Unix(1, 0)}
+			grant = &AuthorizationGrant{
+				Type:          GrantTypeDeviceCode,
+				ClientID:      "test_clientID",
+				TokenEndpoint: oidcEndpoints.TokenEndpoint,
+				Token:         &token,
+			}
+		})
+
+		It("invokes the token exchanger", func() {
+			mockTokenExchanger.ReturnsTokens = &TokenResult{
+				AccessToken: "new token",
+			}
+
+			_, _ = refresher.Refresh(grant)
+			Expect(*mockTokenExchanger.RefreshCalledWithRequest).To(Equal(RefreshTokenExchangeRequest{
+				TokenEndpoint: oidcEndpoints.TokenEndpoint,
+				ClientID:      grant.ClientID,
+				RefreshToken:  "grt",
+			}))
+		})
+
+		It("returns the refreshed access token from the TokenExchanger", func() {
+			mockTokenExchanger.ReturnsTokens = &TokenResult{
+				AccessToken: "new token",
+			}
+
+			grant, _ = refresher.Refresh(grant)
+			Expect(grant.Token.AccessToken).To(Equal(mockTokenExchanger.ReturnsTokens.AccessToken))
+		})
+
+		It("preserves the existing refresh token from the TokenExchanger", func() {
+			mockTokenExchanger.ReturnsTokens = &TokenResult{
+				AccessToken: "new token",
+			}
+
+			grant, _ = refresher.Refresh(grant)
+			Expect(grant.Token.RefreshToken).To(Equal("grt"))
+		})
+
+		It("returns the refreshed refresh token from the TokenExchanger", func() {
+			mockTokenExchanger.ReturnsTokens = &TokenResult{
+				AccessToken:  "new token",
+				RefreshToken: "new token",
+			}
+
+			grant, _ = refresher.Refresh(grant)
+			Expect(grant.Token.RefreshToken).To(Equal("new token"))
+		})
+
+		It("returns a meaningful expiration time", func() {
+			mockTokenExchanger.ReturnsTokens = &TokenResult{
+				AccessToken: "new token",
+				ExpiresIn:   60,
+			}
+
+			grant, _ = refresher.Refresh(grant)
+			Expect(grant.Token.Expiry).To(Equal(mockClock.Now().Add(time.Duration(60) * time.Second)))
+		})
+
+		It("returns an error when TokenExchanger does", func() {
+			mockTokenExchanger.ReturnsError = errors.New("someerror")
+
+			_, err := refresher.Refresh(grant)
+			Expect(err.Error()).To(Equal("could not exchange refresh token: someerror"))
+		})
+	})
+})
diff --git a/oauth2/device_code_provider.go b/oauth2/device_code_provider.go
new file mode 100644
index 0000000..23b226b
--- /dev/null
+++ b/oauth2/device_code_provider.go
@@ -0,0 +1,133 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package oauth2
+
+import (
+	"encoding/json"
+	"fmt"
+	"net/http"
+	"net/url"
+	"strconv"
+	"strings"
+)
+
+// DeviceCodeProvider holds the information needed to easily get a
+// device code locally.
+type LocalDeviceCodeProvider struct {
+	options                LocalDeviceCodeProviderOptions
+	oidcWellKnownEndpoints OIDCWellKnownEndpoints
+	transport              HTTPAuthTransport
+}
+
+type DeviceCodeRequest struct {
+	ClientID string
+	Scopes   []string
+	Audience string
+}
+
+// DeviceCodeResult holds the device code gotten from the device code URL.
+type DeviceCodeResult struct {
+	DeviceCode              string `json:"device_code"`
+	UserCode                string `json:"user_code"`
+	VerificationURI         string `json:"verification_uri"`
+	VerificationURIComplete string `json:"verification_uri_complete"`
+	ExpiresIn               int    `json:"expires_in"`
+	Interval                int    `json:"interval"`
+}
+
+type LocalDeviceCodeProviderOptions struct {
+	ClientID string
+}
+
+// NewLocalDeviceCodeProvider allows for the easy setup of LocalDeviceCodeProvider
+func NewLocalDeviceCodeProvider(
+	options LocalDeviceCodeProviderOptions,
+	oidcWellKnownEndpoints OIDCWellKnownEndpoints,
+	authTransport HTTPAuthTransport) *LocalDeviceCodeProvider {
+	return &LocalDeviceCodeProvider{
+		options,
+		oidcWellKnownEndpoints,
+		authTransport,
+	}
+}
+
+// GetCode obtains a new device code. Additional scopes
+// beyond openid and email can be sent by passing in arguments for
+// <additionalScopes>.
+func (cp *LocalDeviceCodeProvider) GetCode(audience string, additionalScopes ...string) (*DeviceCodeResult, error) {
+	request, err := cp.newDeviceCodeRequest(&DeviceCodeRequest{
+		ClientID: cp.options.ClientID,
+		Scopes:   append([]string{"openid", "email"}, additionalScopes...),
+		Audience: audience,
+	})
+	if err != nil {
+		return nil, err
+	}
+
+	response, err := cp.transport.Do(request)
+	if err != nil {
+		return nil, err
+	}
+
+	dcr, err := cp.handleDeviceCodeResponse(response)
+	if err != nil {
+		return nil, err
+	}
+
+	return dcr, nil
+}
+
+// newDeviceCodeRequest builds a new DeviceCodeRequest wrapped in an
+// http.Request
+func (cp *LocalDeviceCodeProvider) newDeviceCodeRequest(
+	req *DeviceCodeRequest) (*http.Request, error) {
+	uv := url.Values{}
+	uv.Set("client_id", req.ClientID)
+	uv.Set("scope", strings.Join(req.Scopes, " "))
+	uv.Set("audience", req.Audience)
+	euv := uv.Encode()
+
+	request, err := http.NewRequest("POST",
+		cp.oidcWellKnownEndpoints.DeviceAuthorizationEndpoint,
+		strings.NewReader(euv),
+	)
+	if err != nil {
+		return nil, err
+	}
+
+	request.Header.Add("Content-Type", "application/x-www-form-urlencoded")
+	request.Header.Add("Content-Length", strconv.Itoa(len(euv)))
+
+	return request, nil
+}
+
+func (cp *LocalDeviceCodeProvider) handleDeviceCodeResponse(resp *http.Response) (*DeviceCodeResult, error) {
+	if resp.StatusCode < 200 || resp.StatusCode >= 300 {
+		return nil, fmt.Errorf("a non-success status code was received: %d", resp.StatusCode)
+	}
+
+	defer resp.Body.Close()
+
+	dcr := DeviceCodeResult{}
+	err := json.NewDecoder(resp.Body).Decode(&dcr)
+	if err != nil {
+		return nil, err
+	}
+
+	return &dcr, nil
+}
diff --git a/oauth2/go.mod b/oauth2/go.mod
new file mode 100644
index 0000000..d185ef0
--- /dev/null
+++ b/oauth2/go.mod
@@ -0,0 +1,12 @@
+module github.com/apache/pulsar-client-go/oauth2
+
+go 1.13
+
+require (
+	github.com/99designs/keyring v1.1.5
+	github.com/dgrijalva/jwt-go v3.2.0+incompatible
+	github.com/onsi/ginkgo v1.14.0
+	github.com/onsi/gomega v1.10.1
+	github.com/pkg/errors v0.9.1
+	golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
+)
diff --git a/oauth2/go.sum b/oauth2/go.sum
new file mode 100644
index 0000000..5063d2f
--- /dev/null
+++ b/oauth2/go.sum
@@ -0,0 +1,113 @@
+cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
+github.com/99designs/keyring v1.1.5 h1:wLv7QyzYpFIyMSwOADq1CLTF9KbjbBfcnfmOGJ64aO4=
+github.com/99designs/keyring v1.1.5/go.mod h1:7hsVvt2qXgtadGevGJ4ujg+u8m6SpJ5TpHqTozIPqf0=
+github.com/danieljoos/wincred v1.0.2 h1:zf4bhty2iLuwgjgpraD2E9UbvO+fe54XXGJbOwe23fU=
+github.com/danieljoos/wincred v1.0.2/go.mod h1:SnuYRW9lp1oJrZX/dXJqr0cPK5gYXqx3EJbmjhLdK9U=
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
+github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
+github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
+github.com/dvsekhvalnov/jose2go v0.0.0-20180829124132-7f401d37b68a h1:mq+R6XEM6lJX5VlLyZIrUSP8tSuJp82xTK89hvBwJbU=
+github.com/dvsekhvalnov/jose2go v0.0.0-20180829124132-7f401d37b68a/go.mod h1:7BvyPhdbLxMXIYTFPLsyJRFMsKmOZnQmzh6Gb+uquuM=
+github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
+github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
+github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
+github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas=
+github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 h1:ZpnhV/YsD2/4cESfV5+Hoeu/iUR3ruzNvZ+yQfO03a0=
+github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2/go.mod h1:bBOAhwG1umN6/6ZUMtDFBMQR8jRg9O75tm9K00oMsK4=
+github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
+github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
+github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
+github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
+github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
+github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0=
+github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
+github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
+github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
+github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
+github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c h1:6rhixN/i8ZofjG1Y75iExal34USq5p+wiN1tpie8IrU=
+github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c/go.mod h1:NMPJylDgVpX0MLRlPy15sqSwOFv/U1GZ2m21JhFfek0=
+github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
+github.com/keybase/go-keychain v0.0.0-20190712205309-48d3d31d256d h1:Z+RDyXzjKE0i2sTjZ/b1uxiGtPhFy34Ou/Tk0qwN0kM=
+github.com/keybase/go-keychain v0.0.0-20190712205309-48d3d31d256d/go.mod h1:JJNrCn9otv/2QP4D7SMJBgaleKpOf66PnW6F5WGNRIc=
+github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
+github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
+github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
+github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
+github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
+github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
+github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
+github.com/mtibben/percent v0.2.1 h1:5gssi8Nqo8QU/r2pynCm+hBQHpkB/uNK7BJCFogWdzs=
+github.com/mtibben/percent v0.2.1/go.mod h1:KG9uO+SZkUp+VkRHsCdYQV3XSZrrSpR3O9ibNBTZrns=
+github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78=
+github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
+github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
+github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk=
+github.com/onsi/ginkgo v1.14.0 h1:2mOpI4JVVPBN+WQRa0WKH2eXR+Ey+uK4n7Zj0aYpIQA=
+github.com/onsi/ginkgo v1.14.0/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY=
+github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
+github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE=
+github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
+github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
+github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/spf13/afero v1.2.2/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/objx v0.2.0 h1:Hbg2NidpLE8veEBkEZTL3CvlkUIVzuU9jDplZO54c48=
+github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
+github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
+github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
+golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 h1:HuIa8hRrWRSrqYzx1qI49NNxhdi2PrY7gxVSq1JjLDc=
+golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
+golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7 h1:AeiKBIuRw3UomYXSbLy0Mc2dDLfdtbT/IVn4keq83P0=
+golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
+golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d h1:TzXSXBo42m9gQenoE3b9BGiEpg5IG2JkU5FkPIawgtw=
+golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
+golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20200519105757-fe76b779f299 h1:DYfZAGf2WMFjMxbgTjaC+2HC7NkNAQs+6Q8b9WEB/F4=
+golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
+golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
+golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+google.golang.org/appengine v1.4.0 h1:/wp5JvzpHIxhs/dumFmF7BXTf3Z+dd4uXta4kVyO508=
+google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
+google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
+google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
+google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
+google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
+google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
+google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM=
+google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
+gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
+gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
+gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
+gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
+gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE=
+k8s.io/utils v0.0.0-20200619165400-6e3d28b6ed19 h1:7Nu2dTj82c6IaWvL7hImJzcXoTPz1MsSCH7r+0m6rfo=
+k8s.io/utils v0.0.0-20200619165400-6e3d28b6ed19/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
diff --git a/oauth2/oidc_endpoint_provider.go b/oauth2/oidc_endpoint_provider.go
new file mode 100644
index 0000000..32986b7
--- /dev/null
+++ b/oauth2/oidc_endpoint_provider.go
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package oauth2
+
+import (
+	"encoding/json"
+	"net/http"
+	"net/url"
+	"path"
+
+	"github.com/pkg/errors"
+)
+
+// OIDCWellKnownEndpoints holds the well known OIDC endpoints
+type OIDCWellKnownEndpoints struct {
+	AuthorizationEndpoint       string `json:"authorization_endpoint"`
+	TokenEndpoint               string `json:"token_endpoint"`
+	DeviceAuthorizationEndpoint string `json:"device_authorization_endpoint"`
+}
+
+// GetOIDCWellKnownEndpointsFromIssuerURL gets the well known endpoints for the
+// passed in issuer url
+func GetOIDCWellKnownEndpointsFromIssuerURL(issuerURL string) (*OIDCWellKnownEndpoints, error) {
+	u, err := url.Parse(issuerURL)
+	if err != nil {
+		return nil, errors.Wrap(err, "could not parse issuer url to build well known endpoints")
+	}
+	u.Path = path.Join(u.Path, ".well-known/openid-configuration")
+
+	r, err := http.Get(u.String())
+	if err != nil {
+		return nil, errors.Wrapf(err, "could not get well known endpoints from url %s", u.String())
+	}
+	defer r.Body.Close()
+
+	var wkEndpoints OIDCWellKnownEndpoints
+	err = json.NewDecoder(r.Body).Decode(&wkEndpoints)
+	if err != nil {
+		return nil, errors.Wrap(err, "could not decode json body when getting well known endpoints")
+	}
+
+	return &wkEndpoints, nil
+}
diff --git a/oauth2/oidc_endpoint_provider_test.go b/oauth2/oidc_endpoint_provider_test.go
new file mode 100644
index 0000000..4ebce3b
--- /dev/null
+++ b/oauth2/oidc_endpoint_provider_test.go
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package oauth2
+
+import (
+	"encoding/json"
+	"net/http"
+	"net/http/httptest"
+
+	. "github.com/onsi/ginkgo"
+	. "github.com/onsi/gomega"
+)
+
+var _ = Describe("GetOIDCWellKnownEndpointsFromIssuerURL", func() {
+	It("calls and gets the well known data from the correct endpoint for the issuer", func() {
+		var req *http.Request
+		wkEndpointsResp := OIDCWellKnownEndpoints{
+			AuthorizationEndpoint: "the-auth-endpoint", TokenEndpoint: "the-token-endpoint"}
+		responseBytes, err := json.Marshal(wkEndpointsResp)
+		Expect(err).ToNot(HaveOccurred())
+
+		ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+			req = r
+
+			w.WriteHeader(http.StatusOK)
+			w.Write(responseBytes)
+
+		}))
+		defer ts.Close()
+
+		endpoints, err := GetOIDCWellKnownEndpointsFromIssuerURL(ts.URL)
+
+		Expect(err).ToNot(HaveOccurred())
+		Expect(*endpoints).To(Equal(wkEndpointsResp))
+		Expect(req.URL.Path).To(Equal("/.well-known/openid-configuration"))
+	})
+
+	It("errors when url.Parse errors", func() {
+		endpoints, err := GetOIDCWellKnownEndpointsFromIssuerURL("://")
+
+		Expect(err).To(HaveOccurred())
+		Expect(err.Error()).To(Equal(
+			"could not parse issuer url to build well known endpoints: parse ://: missing protocol scheme"))
+		Expect(endpoints).To(BeNil())
+	})
+
+	It("errors when the get errors", func() {
+		endpoints, err := GetOIDCWellKnownEndpointsFromIssuerURL("https://")
+
+		Expect(err).To(HaveOccurred())
+		Expect(err.Error()).To(Equal(
+			"could not get well known endpoints from url https://.well-known/openid-configuration: " +
+				"Get https://.well-known/openid-configuration: dial tcp: lookup .well-known: no such host"))
+		Expect(endpoints).To(BeNil())
+	})
+
+	It("errors when the json decoder errors", func() {
+		var req *http.Request
+
+		ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+			req = r
+
+			w.WriteHeader(http.StatusOK)
+			w.Write([]byte("<"))
+
+		}))
+		defer ts.Close()
+
+		endpoints, err := GetOIDCWellKnownEndpointsFromIssuerURL(ts.URL)
+
+		Expect(err).To(HaveOccurred())
+		Expect(err.Error()).To(Equal("could not decode json body when getting well" +
+			" known endpoints: invalid character '<' looking for beginning of value"))
+		Expect(endpoints).To(BeNil())
+		Expect(req.URL.Path).To(Equal("/.well-known/openid-configuration"))
+	})
+})
diff --git a/oauth2/store/keyring.go b/oauth2/store/keyring.go
new file mode 100644
index 0000000..fd49baf
--- /dev/null
+++ b/oauth2/store/keyring.go
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package store
+
+import (
+	"crypto/sha1"
+	"encoding/json"
+	"fmt"
+	"sync"
+
+	"github.com/99designs/keyring"
+	"github.com/apache/pulsar-client-go/oauth2"
+	"github.com/apache/pulsar-client-go/oauth2/clock"
+)
+
+type KeyringStore struct {
+	kr    keyring.Keyring
+	clock clock.Clock
+	lock  sync.Mutex
+}
+
+// storedItem represents an item stored in the keyring
+type storedItem struct {
+	Audience string
+	UserName string
+	Grant    oauth2.AuthorizationGrant
+}
+
+// NewKeyringStore creates a store based on a keyring.
+func NewKeyringStore(kr keyring.Keyring) (*KeyringStore, error) {
+	return &KeyringStore{
+		kr:    kr,
+		clock: clock.RealClock{},
+	}, nil
+}
+
+var _ Store = &KeyringStore{}
+
+func (f *KeyringStore) SaveGrant(audience string, grant oauth2.AuthorizationGrant) error {
+	f.lock.Lock()
+	defer f.lock.Unlock()
+
+	var err error
+	var userName string
+	switch grant.Type {
+	case oauth2.GrantTypeClientCredentials:
+		if grant.ClientCredentials == nil {
+			return ErrUnsupportedAuthData
+		}
+		userName = grant.ClientCredentials.ClientEmail
+	case oauth2.GrantTypeDeviceCode:
+		if grant.Token == nil {
+			return ErrUnsupportedAuthData
+		}
+		userName, err = oauth2.ExtractUserName(*grant.Token)
+		if err != nil {
+			return err
+		}
+	default:
+		return ErrUnsupportedAuthData
+	}
+	item := storedItem{
+		Audience: audience,
+		UserName: userName,
+		Grant:    grant,
+	}
+	err = f.setItem(item)
+	if err != nil {
+		return err
+	}
+	return nil
+}
+
+func (f *KeyringStore) LoadGrant(audience string) (*oauth2.AuthorizationGrant, error) {
+	f.lock.Lock()
+	defer f.lock.Unlock()
+
+	item, err := f.getItem(audience)
+	if err != nil {
+		if err == keyring.ErrKeyNotFound {
+			return nil, ErrNoAuthenticationData
+		}
+		return nil, err
+	}
+	switch item.Grant.Type {
+	case oauth2.GrantTypeClientCredentials:
+		if item.Grant.ClientCredentials == nil {
+			return nil, ErrUnsupportedAuthData
+		}
+	case oauth2.GrantTypeDeviceCode:
+		if item.Grant.Token == nil {
+			return nil, ErrUnsupportedAuthData
+		}
+	default:
+		return nil, ErrUnsupportedAuthData
+	}
+	return &item.Grant, nil
+}
+
+func (f *KeyringStore) WhoAmI(audience string) (string, error) {
+	f.lock.Lock()
+	defer f.lock.Unlock()
+
+	key := hashKeyringKey(audience)
+	authItem, err := f.kr.GetMetadata(key)
+	if err != nil {
+		if err == keyring.ErrKeyNotFound {
+			return "", ErrNoAuthenticationData
+		}
+		return "", fmt.Errorf("unable to get information from the keyring: %v", err)
+	}
+	return authItem.Label, nil
+}
+
+func (f *KeyringStore) Logout() error {
+	f.lock.Lock()
+	defer f.lock.Unlock()
+
+	var err error
+	keys, err := f.kr.Keys()
+	if err != nil {
+		return fmt.Errorf("unable to get information from the keyring: %v", err)
+	}
+	for _, key := range keys {
+		err = f.kr.Remove(key)
+	}
+	if err != nil {
+		return fmt.Errorf("unable to update the keyring: %v", err)
+	}
+	return nil
+}
+
+func (f *KeyringStore) getItem(audience string) (storedItem, error) {
+	key := hashKeyringKey(audience)
+	i, err := f.kr.Get(key)
+	if err != nil {
+		return storedItem{}, err
+	}
+	var grant oauth2.AuthorizationGrant
+	err = json.Unmarshal(i.Data, &grant)
+	if err != nil {
+		// the grant appears to be invalid
+		return storedItem{}, ErrUnsupportedAuthData
+	}
+	return storedItem{
+		Audience: audience,
+		UserName: i.Label,
+		Grant:    grant,
+	}, nil
+}
+
+func (f *KeyringStore) setItem(item storedItem) error {
+	key := hashKeyringKey(item.Audience)
+	data, err := json.Marshal(item.Grant)
+	if err != nil {
+		return err
+	}
+	i := keyring.Item{
+		Key:                         key,
+		Data:                        data,
+		Label:                       item.UserName,
+		Description:                 "authorization grant",
+		KeychainNotTrustApplication: false,
+		KeychainNotSynchronizable:   false,
+	}
+	err = f.kr.Set(i)
+	if err != nil {
+		return fmt.Errorf("unable to update the keyring: %v", err)
+	}
+	return nil
+}
+
+// hashKeyringKey creates a safe key based on the given string
+func hashKeyringKey(s string) string {
+	h := sha1.New()
+	h.Write([]byte(s))
+	bs := h.Sum(nil)
+	return fmt.Sprintf("%x", bs)
+}
diff --git a/oauth2/store/memory.go b/oauth2/store/memory.go
new file mode 100644
index 0000000..07c7594
--- /dev/null
+++ b/oauth2/store/memory.go
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package store
+
+import (
+	"sync"
+
+	"github.com/apache/pulsar-client-go/oauth2"
+	"github.com/apache/pulsar-client-go/oauth2/clock"
+)
+
+type MemoryStore struct {
+	clock  clock.Clock
+	lock   sync.Mutex
+	grants map[string]*oauth2.AuthorizationGrant
+}
+
+func NewMemoryStore() Store {
+	return &MemoryStore{
+		clock:  clock.RealClock{},
+		grants: make(map[string]*oauth2.AuthorizationGrant),
+	}
+}
+
+var _ Store = &MemoryStore{}
+
+func (f *MemoryStore) SaveGrant(audience string, grant oauth2.AuthorizationGrant) error {
+	f.lock.Lock()
+	defer f.lock.Unlock()
+	f.grants[audience] = &grant
+	return nil
+}
+
+func (f *MemoryStore) LoadGrant(audience string) (*oauth2.AuthorizationGrant, error) {
+	f.lock.Lock()
+	defer f.lock.Unlock()
+	grant, ok := f.grants[audience]
+	if !ok {
+		return nil, ErrNoAuthenticationData
+	}
+	return grant, nil
+}
+
+func (f *MemoryStore) WhoAmI(audience string) (string, error) {
+	f.lock.Lock()
+	defer f.lock.Unlock()
+	grant, ok := f.grants[audience]
+	if !ok {
+		return "", ErrNoAuthenticationData
+	}
+	switch grant.Type {
+	case oauth2.GrantTypeClientCredentials:
+		if grant.ClientCredentials == nil {
+			return "", ErrUnsupportedAuthData
+		}
+		return grant.ClientCredentials.ClientEmail, nil
+	case oauth2.GrantTypeDeviceCode:
+		if grant.Token == nil {
+			return "", ErrUnsupportedAuthData
+		}
+		return oauth2.ExtractUserName(*grant.Token)
+	default:
+		return "", ErrUnsupportedAuthData
+	}
+}
+
+func (f *MemoryStore) Logout() error {
+	f.lock.Lock()
+	defer f.lock.Unlock()
+	f.grants = map[string]*oauth2.AuthorizationGrant{}
+	return nil
+}
diff --git a/oauth2/store/store.go b/oauth2/store/store.go
new file mode 100644
index 0000000..55d4c9e
--- /dev/null
+++ b/oauth2/store/store.go
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package store
+
+import (
+	"errors"
+
+	"github.com/apache/pulsar-client-go/oauth2"
+)
+
+// ErrNoAuthenticationData indicates that stored authentication data is not available
+var ErrNoAuthenticationData = errors.New("authentication data is not available")
+
+// ErrUnsupportedAuthData ndicates that stored authentication data is unusable
+var ErrUnsupportedAuthData = errors.New("authentication data is not usable")
+
+// Store is responsible for persisting authorization grants
+type Store interface {
+	// SaveGrant stores an authorization grant for a given audience
+	SaveGrant(audience string, grant oauth2.AuthorizationGrant) error
+
+	// LoadGrant loads an authorization grant for a given audience
+	LoadGrant(audience string) (*oauth2.AuthorizationGrant, error)
+
+	// WhoAmI returns the current user name (or an error if nobody is logged in)
+	WhoAmI(audience string) (string, error)
+
+	// Logout deletes all stored credentials
+	Logout() error
+}
diff --git a/pulsar/client.go b/pulsar/client.go
index d4af906..460b275 100644
--- a/pulsar/client.go
+++ b/pulsar/client.go
@@ -62,6 +62,11 @@ func NewAuthenticationAthenz(authParams map[string]string) Authentication {
 	return athenz
 }
 
+func NewAuthenticationOAuth2(authParams map[string]string) Authentication {
+	oauth, _ := auth.NewAuthenticationOAuth2WithParams(authParams)
+	return oauth
+}
+
 // Builder interface that is used to construct a Pulsar Client instance.
 type ClientOptions struct {
 	// Configure the service URL for the Pulsar service.
diff --git a/pulsar/client_impl_test.go b/pulsar/client_impl_test.go
index 308c8e0..276e4fb 100644
--- a/pulsar/client_impl_test.go
+++ b/pulsar/client_impl_test.go
@@ -20,9 +20,13 @@ package pulsar
 import (
 	"fmt"
 	"io/ioutil"
+	"net/http"
+	"net/http/httptest"
+	"os"
 	"testing"
 	"time"
 
+	"github.com/apache/pulsar-client-go/pulsar/internal/auth"
 	"github.com/stretchr/testify/assert"
 )
 
@@ -219,6 +223,96 @@ func TestTokenAuthFromFile(t *testing.T) {
 	client.Close()
 }
 
+// mockOAuthServer will mock a oauth service for the tests
+func mockOAuthServer() *httptest.Server {
+	// prepare a port for the mocked server
+	server := httptest.NewUnstartedServer(http.DefaultServeMux)
+
+	// mock the used REST path for the tests
+	mockedHandler := http.NewServeMux()
+	mockedHandler.HandleFunc("/.well-known/openid-configuration", func(writer http.ResponseWriter, request *http.Request) {
+		s := fmt.Sprintf(`{
+    "issuer":"%s",
+    "authorization_endpoint":"%s/authorize",
+    "token_endpoint":"%s/oauth/token",
+    "device_authorization_endpoint":"%s/oauth/device/code"
+}`, server.URL, server.URL, server.URL, server.URL)
+		fmt.Fprintln(writer, s)
+	})
+	mockedHandler.HandleFunc("/oauth/token", func(writer http.ResponseWriter, request *http.Request) {
+		fmt.Fprintln(writer, "{\n"+
+			"  \"access_token\": \"eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0b2tlbi1wcmluY2lwYWwifQ."+
+			"tSfgR8l7dKC6LoWCxQgNkuSB8our7xV_nAM7wpgCbG4\",\n"+
+			"  \"token_type\": \"Bearer\"\n}")
+	})
+	mockedHandler.HandleFunc("/authorize", func(writer http.ResponseWriter, request *http.Request) {
+		fmt.Fprintln(writer, "true")
+	})
+
+	server.Config.Handler = mockedHandler
+	server.Start()
+
+	return server
+}
+
+// mockKeyFile will mock a temp key file for testing.
+func mockKeyFile(server string) (string, error) {
+	pwd, err := os.Getwd()
+	if err != nil {
+		return "", err
+	}
+	kf, err := ioutil.TempFile(pwd, "test_oauth2")
+	if err != nil {
+		return "", err
+	}
+	_, err = kf.WriteString(fmt.Sprintf(`{
+  "type":"sn_service_account",
+  "client_id":"client-id",
+  "client_secret":"client-secret",
+  "client_email":"oauth@test.org",
+  "issuer_url":"%s"
+}`, server))
+	if err != nil {
+		return "", err
+	}
+
+	return kf.Name(), nil
+}
+
+func TestOAuth2Auth(t *testing.T) {
+	server := mockOAuthServer()
+	defer server.Close()
+	kf, err := mockKeyFile(server.URL)
+	defer os.Remove(kf)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	params := map[string]string{
+		auth.ConfigParamType:      auth.ConfigParamTypeClientCredentials,
+		auth.ConfigParamIssuerURL: server.URL,
+		auth.ConfigParamClientID:  "client-id",
+		auth.ConfigParamAudience:  "audience",
+		auth.ConfigParamKeyFile:   kf,
+	}
+
+	oauth := NewAuthenticationOAuth2(params)
+	client, err := NewClient(ClientOptions{
+		URL:            serviceURL,
+		Authentication: oauth,
+	})
+	assert.NoError(t, err)
+
+	producer, err := client.CreateProducer(ProducerOptions{
+		Topic: newAuthTopicName(),
+	})
+
+	assert.NoError(t, err)
+	assert.NotNil(t, producer)
+
+	client.Close()
+}
+
 func TestTopicPartitions(t *testing.T) {
 	client, err := NewClient(ClientOptions{
 		URL: "pulsar://localhost:6650",
diff --git a/pulsar/internal/auth/oauth2.go b/pulsar/internal/auth/oauth2.go
new file mode 100644
index 0000000..9ee63ab
--- /dev/null
+++ b/pulsar/internal/auth/oauth2.go
@@ -0,0 +1,145 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package auth
+
+import (
+	"crypto/tls"
+	"fmt"
+
+	"github.com/apache/pulsar-client-go/oauth2"
+	"github.com/apache/pulsar-client-go/oauth2/cache"
+	"github.com/apache/pulsar-client-go/oauth2/clock"
+	"github.com/apache/pulsar-client-go/oauth2/store"
+)
+
+const (
+	ConfigParamType                  = "type"
+	ConfigParamTypeClientCredentials = "client_credentials"
+	ConfigParamIssuerURL             = "issuerUrl"
+	ConfigParamAudience              = "audience"
+	ConfigParamKeyFile               = "privateKey"
+	ConfigParamClientID              = "clientId"
+)
+
+type oauth2AuthProvider struct {
+	clock  clock.Clock
+	issuer oauth2.Issuer
+	store  store.Store
+	source cache.CachingTokenSource
+}
+
+// NewAuthenticationOAuth2WithParams return a interface of Provider with string map.
+func NewAuthenticationOAuth2WithParams(params map[string]string) (Provider, error) {
+	issuer := oauth2.Issuer{
+		IssuerEndpoint: params[ConfigParamIssuerURL],
+		ClientID:       params[ConfigParamClientID],
+		Audience:       params[ConfigParamAudience],
+	}
+
+	// initialize a store of authorization grants
+	st := store.NewMemoryStore()
+	switch params[ConfigParamType] {
+	case ConfigParamTypeClientCredentials:
+		flow, err := oauth2.NewDefaultClientCredentialsFlow(oauth2.ClientCredentialsFlowOptions{
+			KeyFile:          params[ConfigParamKeyFile],
+			AdditionalScopes: nil,
+		})
+		if err != nil {
+			return nil, err
+		}
+		grant, err := flow.Authorize(issuer.Audience)
+		if err != nil {
+			return nil, err
+		}
+		err = st.SaveGrant(issuer.Audience, *grant)
+		if err != nil {
+			return nil, err
+		}
+	default:
+		return nil, fmt.Errorf("unsupported authentication type: %s", params[ConfigParamType])
+	}
+
+	return NewAuthenticationOAuth2(issuer, st), nil
+}
+
+func NewAuthenticationOAuth2(
+	issuer oauth2.Issuer,
+	store store.Store) Provider {
+
+	return &oauth2AuthProvider{
+		clock:  clock.RealClock{},
+		issuer: issuer,
+		store:  store,
+	}
+}
+
+func (p *oauth2AuthProvider) Init() error {
+	grant, err := p.store.LoadGrant(p.issuer.Audience)
+	if err != nil {
+		if err == store.ErrNoAuthenticationData {
+			return nil
+		}
+		return err
+	}
+	refresher, err := p.getRefresher(grant.Type)
+	if err != nil {
+		return err
+	}
+
+	source, err := cache.NewDefaultTokenCache(p.store, p.issuer.Audience, refresher)
+	if err != nil {
+		return err
+	}
+	p.source = source
+	return nil
+}
+
+func (p *oauth2AuthProvider) Name() string {
+	return "token"
+}
+
+func (p *oauth2AuthProvider) GetTLSCertificate() (*tls.Certificate, error) {
+	return nil, nil
+}
+
+func (p *oauth2AuthProvider) GetData() ([]byte, error) {
+	if p.source == nil {
+		// anonymous access
+		return nil, nil
+	}
+	token, err := p.source.Token()
+	if err != nil {
+		return nil, err
+	}
+	return []byte(token.AccessToken), nil
+}
+
+func (p *oauth2AuthProvider) Close() error {
+	return nil
+}
+
+func (p *oauth2AuthProvider) getRefresher(t oauth2.AuthorizationGrantType) (oauth2.AuthorizationGrantRefresher, error) {
+	switch t {
+	case oauth2.GrantTypeClientCredentials:
+		return oauth2.NewDefaultClientCredentialsGrantRefresher(p.clock)
+	case oauth2.GrantTypeDeviceCode:
+		return oauth2.NewDefaultDeviceAuthorizationGrantRefresher(p.clock)
+	default:
+		return nil, store.ErrUnsupportedAuthData
+	}
+}
diff --git a/pulsar/internal/auth/oauth2_test.go b/pulsar/internal/auth/oauth2_test.go
new file mode 100644
index 0000000..c8a4830
--- /dev/null
+++ b/pulsar/internal/auth/oauth2_test.go
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package auth
+
+import (
+	"fmt"
+	"io/ioutil"
+	"net/http"
+	"net/http/httptest"
+	"os"
+	"testing"
+
+	"github.com/pkg/errors"
+	"github.com/stretchr/testify/assert"
+)
+
+// mockOAuthServer will mock a oauth service for the tests
+func mockOAuthServer() *httptest.Server {
+	// prepare a port for the mocked server
+	server := httptest.NewUnstartedServer(http.DefaultServeMux)
+
+	// mock the used REST path for the tests
+	mockedHandler := http.NewServeMux()
+	mockedHandler.HandleFunc("/.well-known/openid-configuration", func(writer http.ResponseWriter, request *http.Request) {
+		s := fmt.Sprintf(`{
+    "issuer":"%s",
+    "authorization_endpoint":"%s/authorize",
+    "token_endpoint":"%s/oauth/token",
+    "device_authorization_endpoint":"%s/oauth/device/code"
+}`, server.URL, server.URL, server.URL, server.URL)
+		fmt.Fprintln(writer, s)
+	})
+	mockedHandler.HandleFunc("/oauth/token", func(writer http.ResponseWriter, request *http.Request) {
+		fmt.Fprintln(writer, "{\n  \"access_token\": \"token-content\",\n  \"token_type\": \"Bearer\"\n}")
+	})
+	mockedHandler.HandleFunc("/authorize", func(writer http.ResponseWriter, request *http.Request) {
+		fmt.Fprintln(writer, "true")
+	})
+
+	server.Config.Handler = mockedHandler
+	server.Start()
+
+	return server
+}
+
+// mockKeyFile will mock a temp key file for testing.
+func mockKeyFile(server string) (string, error) {
+	pwd, err := os.Getwd()
+	if err != nil {
+		return "", err
+	}
+	kf, err := ioutil.TempFile(pwd, "test_oauth2")
+	if err != nil {
+		return "", err
+	}
+	_, err = kf.WriteString(fmt.Sprintf(`{
+  "type":"sn_service_account",
+  "client_id":"client-id",
+  "client_secret":"client-secret",
+  "client_email":"oauth@test.org",
+  "issuer_url":"%s"
+}`, server))
+	if err != nil {
+		return "", err
+	}
+
+	return kf.Name(), nil
+}
+
+func TestNewAuthenticationOAuth2WithParams(t *testing.T) {
+	server := mockOAuthServer()
+	defer server.Close()
+	kf, err := mockKeyFile(server.URL)
+	defer os.Remove(kf)
+	if err != nil {
+		t.Fatal(errors.Wrap(err, "create mocked key file failed"))
+	}
+
+	params := map[string]string{
+		ConfigParamType:      ConfigParamTypeClientCredentials,
+		ConfigParamIssuerURL: server.URL,
+		ConfigParamClientID:  "client-id",
+		ConfigParamAudience:  "audience",
+		ConfigParamKeyFile:   kf,
+	}
+
+	auth, err := NewAuthenticationOAuth2WithParams(params)
+	if err != nil {
+		t.Fatal(err)
+	}
+	err = auth.Init()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	token, err := auth.GetData()
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	assert.Equal(t, "token-content", string(token))
+}
diff --git a/pulsar/internal/auth/token.go b/pulsar/internal/auth/token.go
index e6a6d97..d8d0cd3 100644
--- a/pulsar/internal/auth/token.go
+++ b/pulsar/internal/auth/token.go
@@ -102,6 +102,6 @@ func (p *tokenAuthProvider) GetData() ([]byte, error) {
 	return []byte(t), nil
 }
 
-func (tokenAuthProvider) Close() error {
+func (p *tokenAuthProvider) Close() error {
 	return nil
 }