You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/07/15 08:36:34 UTC
[pulsar-client-go] branch master updated: Support oauth2
authentication for pulsar-client-go (#313)
This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new b9f8c5c Support oauth2 authentication for pulsar-client-go (#313)
b9f8c5c is described below
commit b9f8c5cedefb1bfb82faee09ce06f48c8f7300f5
Author: Yong Zhang <zh...@gmail.com>
AuthorDate: Wed Jul 15 16:36:26 2020 +0800
Support oauth2 authentication for pulsar-client-go (#313)
* Authentication provider for OAuth 2.0
- based on cloud-cli @ bc645b16ca7b7474b132ee1da8b56da35025a616
* Add tests and Update license
* Revert zstd version
* Refactor to support multiple issuers.
- decouple the issuer parameter from the audience
- use issuer information that is in the keyfile
* Address comments
* Add tests
* Change clock package
Co-authored-by: Eron Wright <ew...@streamnative.io>
---
go.mod | 7 +-
go.sum | 64 +++++-
oauth2/auth.go | 120 ++++++++++
oauth2/auth_suite_test.go | 65 ++++++
oauth2/authorization_tokenretriever.go | 338 ++++++++++++++++++++++++++++
oauth2/authorization_tokenretriever_test.go | 338 ++++++++++++++++++++++++++++
oauth2/cache/cache.go | 142 ++++++++++++
oauth2/client_credentials_flow.go | 158 +++++++++++++
oauth2/client_credentials_flow_test.go | 183 +++++++++++++++
oauth2/client_credentials_provider.go | 66 ++++++
oauth2/clock/clock.go | 98 ++++++++
oauth2/clock/testing/fake_clock.go | 275 ++++++++++++++++++++++
oauth2/config_tokenprovider.go | 57 +++++
oauth2/config_tokenprovider_test.go | 91 ++++++++
oauth2/device_code_flow.go | 203 +++++++++++++++++
oauth2/device_code_flow_test.go | 230 +++++++++++++++++++
oauth2/device_code_provider.go | 133 +++++++++++
oauth2/go.mod | 12 +
oauth2/go.sum | 113 ++++++++++
oauth2/oidc_endpoint_provider.go | 58 +++++
oauth2/oidc_endpoint_provider_test.go | 92 ++++++++
oauth2/store/keyring.go | 194 ++++++++++++++++
oauth2/store/memory.go | 87 +++++++
oauth2/store/store.go | 45 ++++
pulsar/client.go | 5 +
pulsar/client_impl_test.go | 94 ++++++++
pulsar/internal/auth/oauth2.go | 145 ++++++++++++
pulsar/internal/auth/oauth2_test.go | 117 ++++++++++
pulsar/internal/auth/token.go | 2 +-
29 files changed, 3524 insertions(+), 8 deletions(-)
diff --git a/go.mod b/go.mod
index afae8cc..969b35a 100644
--- a/go.mod
+++ b/go.mod
@@ -1,9 +1,10 @@
module github.com/apache/pulsar-client-go
-go 1.12
+go 1.13
require (
github.com/DataDog/zstd v1.4.6-0.20200617134701-89f69fb7df32
+ github.com/apache/pulsar-client-go/oauth2 v0.0.0-00010101000000-000000000000
github.com/beefsack/go-rate v0.0.0-20180408011153-efa7637bb9b6
github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b
github.com/gogo/protobuf v1.3.1
@@ -11,7 +12,7 @@ require (
github.com/klauspost/compress v1.10.8
github.com/kr/pretty v0.2.0 // indirect
github.com/pierrec/lz4 v2.0.5+incompatible
- github.com/pkg/errors v0.8.1
+ github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.7.1
github.com/sirupsen/logrus v1.4.2
github.com/spaolacci/murmur3 v1.1.0
@@ -20,3 +21,5 @@ require (
github.com/stretchr/testify v1.4.0
github.com/yahoo/athenz v1.8.55
)
+
+replace github.com/apache/pulsar-client-go/oauth2 => ./oauth2
diff --git a/go.sum b/go.sum
index 9f51dbb..e766ca5 100644
--- a/go.sum
+++ b/go.sum
@@ -1,3 +1,6 @@
+cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
+github.com/99designs/keyring v1.1.5 h1:wLv7QyzYpFIyMSwOADq1CLTF9KbjbBfcnfmOGJ64aO4=
+github.com/99designs/keyring v1.1.5/go.mod h1:7hsVvt2qXgtadGevGJ4ujg+u8m6SpJ5TpHqTozIPqf0=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/DataDog/zstd v1.4.6-0.20200617134701-89f69fb7df32 h1:/gZKpgSMydtrih81nvUhlkXpZIUfthKShSCVbRzBt9Y=
github.com/DataDog/zstd v1.4.6-0.20200617134701-89f69fb7df32/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
@@ -19,16 +22,27 @@ github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b/go.mod h1:ac9efd0D
github.com/boynton/repl v0.0.0-20170116235056-348863958e3e/go.mod h1:Crc/GCZ3NXDVCio7Yr0o+SSrytpcFhLmVCIzi0s49t4=
github.com/cespare/xxhash/v2 v2.1.1 h1:6MnRN8NT7+YBpUIWxHtefFZOKTAPgGjpQSxqLNn0+qY=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
+github.com/danieljoos/wincred v1.0.2 h1:zf4bhty2iLuwgjgpraD2E9UbvO+fe54XXGJbOwe23fU=
+github.com/danieljoos/wincred v1.0.2/go.mod h1:SnuYRW9lp1oJrZX/dXJqr0cPK5gYXqx3EJbmjhLdK9U=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
+github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/dimfeld/httptreemux v5.0.1+incompatible h1:Qj3gVcDNoOthBAqftuD596rm4wg/adLLz5xh5CmpiCA=
github.com/dimfeld/httptreemux v5.0.1+incompatible/go.mod h1:rbUlSV+CCpv/SuqUTP/8Bk2O3LyUV436/yaRGkhP6Z0=
+github.com/dvsekhvalnov/jose2go v0.0.0-20180829124132-7f401d37b68a h1:mq+R6XEM6lJX5VlLyZIrUSP8tSuJp82xTK89hvBwJbU=
+github.com/dvsekhvalnov/jose2go v0.0.0-20180829124132-7f401d37b68a/go.mod h1:7BvyPhdbLxMXIYTFPLsyJRFMsKmOZnQmzh6Gb+uquuM=
+github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
+github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
+github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9GBnD5lWE=
github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
+github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 h1:ZpnhV/YsD2/4cESfV5+Hoeu/iUR3ruzNvZ+yQfO03a0=
+github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2/go.mod h1:bBOAhwG1umN6/6ZUMtDFBMQR8jRg9O75tm9K00oMsK4=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
github.com/gogo/protobuf v1.3.1 h1:DqDEcV5aeaTmdFBePNpYsp3FlcVH/2ISVVM9Qf8PSls=
github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
@@ -49,6 +63,9 @@ github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg=
github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
+github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c h1:6rhixN/i8ZofjG1Y75iExal34USq5p+wiN1tpie8IrU=
+github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c/go.mod h1:NMPJylDgVpX0MLRlPy15sqSwOFv/U1GZ2m21JhFfek0=
+github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/jawher/mow.cli v1.0.4/go.mod h1:5hQj2V8g+qYmLUVWqu4Wuja1pI57M83EChYLVZ0sMKk=
@@ -56,6 +73,8 @@ github.com/jawher/mow.cli v1.1.0/go.mod h1:aNaQlc7ozF3vw6IJ2dHjp2ZFiA4ozMIYY6Pyu
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7VTCxuUUipMqKk8s4w=
+github.com/keybase/go-keychain v0.0.0-20190712205309-48d3d31d256d h1:Z+RDyXzjKE0i2sTjZ/b1uxiGtPhFy34Ou/Tk0qwN0kM=
+github.com/keybase/go-keychain v0.0.0-20190712205309-48d3d31d256d/go.mod h1:JJNrCn9otv/2QP4D7SMJBgaleKpOf66PnW6F5WGNRIc=
github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00=
github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck=
github.com/klauspost/compress v1.10.8 h1:eLeJ3dr/Y9+XRfJT4l+8ZjmtB5RPJhucH2HeCV5+IZY=
@@ -71,16 +90,30 @@ github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0jegS5sx/RkqARlsWZ6pIwiU=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
+github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
+github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0=
+github.com/mtibben/percent v0.2.1 h1:5gssi8Nqo8QU/r2pynCm+hBQHpkB/uNK7BJCFogWdzs=
+github.com/mtibben/percent v0.2.1/go.mod h1:KG9uO+SZkUp+VkRHsCdYQV3XSZrrSpR3O9ibNBTZrns=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
+github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78=
+github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
+github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
+github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk=
+github.com/onsi/ginkgo v1.14.0 h1:2mOpI4JVVPBN+WQRa0WKH2eXR+Ey+uK4n7Zj0aYpIQA=
+github.com/onsi/ginkgo v1.14.0/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY=
+github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
+github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE=
+github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
-github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
+github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
+github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
@@ -112,7 +145,6 @@ github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/objx v0.2.0 h1:Hbg2NidpLE8veEBkEZTL3CvlkUIVzuU9jDplZO54c48=
github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
-github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
@@ -120,28 +152,43 @@ github.com/yahoo/athenz v1.8.55 h1:xGhxN3yLq334APyn0Zvcc+aqu78Q7BBhYJevM3EtTW0=
github.com/yahoo/athenz v1.8.55/go.mod h1:G7LLFUH7Z/r4QAB7FfudfuA7Am/eCzO1GlzBhDL6Kv0=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 h1:HuIa8hRrWRSrqYzx1qI49NNxhdi2PrY7gxVSq1JjLDc=
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
+golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190613194153-d28f0bde5980/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7 h1:AeiKBIuRw3UomYXSbLy0Mc2dDLfdtbT/IVn4keq83P0=
+golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
+golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d h1:TzXSXBo42m9gQenoE3b9BGiEpg5IG2JkU5FkPIawgtw=
+golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
+golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
-golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 h1:I6FyU15t786LL7oL/hn43zqTuEGr4PN7F4XJ1p4E3Y8=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
-golang.org/x/sys v0.0.0-20190804053845-51ab0e2deafa h1:KIDDMLT1O0Nr7TSxp8xM5tJcdn8tgyAONntO829og1M=
+golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190804053845-51ab0e2deafa/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1 h1:ogLJMz+qpzav7lGMh10LMvAkM/fAoGlaiiHYiFYdm80=
golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20181030221726-6c7e314b6563/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
@@ -149,6 +196,8 @@ golang.org/x/tools v0.0.0-20190808195139-e713427fea3f/go.mod h1:b+2E5dAYhXwXZwtn
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+google.golang.org/appengine v1.4.0 h1:/wp5JvzpHIxhs/dumFmF7BXTf3Z+dd4uXta4kVyO508=
+google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
@@ -158,13 +207,18 @@ google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyz
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15 h1:YR8cESwS4TdDjEe65xsg0ogRM/Nc3DYOhEAlW+xobZo=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k=
gopkg.in/square/go-jose.v2 v2.4.1/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI=
+gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
+gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
-gopkg.in/yaml.v2 v2.2.7 h1:VUgggvou5XRW9mHwD/yXxIYSMtY0zoKQf/v226p2nyo=
gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
+gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
diff --git a/oauth2/auth.go b/oauth2/auth.go
new file mode 100644
index 0000000..867afae
--- /dev/null
+++ b/oauth2/auth.go
@@ -0,0 +1,120 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package oauth2
+
+import (
+ "fmt"
+ "time"
+
+ "github.com/apache/pulsar-client-go/oauth2/clock"
+ "github.com/dgrijalva/jwt-go"
+ "golang.org/x/oauth2"
+)
+
+const (
+ ClaimNameUserName = "https://pulsar.apache.org/username"
+)
+
+// Flow abstracts an OAuth 2.0 authentication and authorization flow
+type Flow interface {
+ // Authorize obtains an authorization grant based on an OAuth 2.0 authorization flow.
+ // The method returns a grant which may contain an initial access token.
+ Authorize(audience string) (*AuthorizationGrant, error)
+}
+
+// AuthorizationGrantRefresher refreshes OAuth 2.0 authorization grant
+type AuthorizationGrantRefresher interface {
+ // Refresh refreshes an authorization grant to contain a fresh access token
+ Refresh(grant *AuthorizationGrant) (*AuthorizationGrant, error)
+}
+
+type AuthorizationGrantType string
+
+const (
+ // GrantTypeClientCredentials represents a client credentials grant
+ GrantTypeClientCredentials AuthorizationGrantType = "client_credentials"
+
+ // GrantTypeDeviceCode represents a device code grant
+ GrantTypeDeviceCode AuthorizationGrantType = "device_code"
+)
+
+// AuthorizationGrant is a credential representing the resource owner's authorization
+// to access its protected resources, and is used by the client to obtain an access token
+type AuthorizationGrant struct {
+ // Type describes the type of authorization grant represented by this structure
+ Type AuthorizationGrantType `json:"type"`
+
+ // Audience is the intended audience of the access tokens
+ Audience string `json:"audience,omitempty"`
+
+ // ClientID is an OAuth2 client identifier used by some flows
+ ClientID string `json:"client_id,omitempty"`
+
+ // ClientCredentials is credentials data for the client credentials grant type
+ ClientCredentials *KeyFile `json:"client_credentials,omitempty"`
+
+ // the token endpoint
+ TokenEndpoint string `json:"token_endpoint"`
+
+ // Token contains an access token in the client credentials grant type,
+ // and a refresh token in the device authorization grant type
+ Token *oauth2.Token `json:"token,omitempty"`
+}
+
+// TokenResult holds token information
+type TokenResult struct {
+ AccessToken string `json:"access_token"`
+ IDToken string `json:"id_token"`
+ RefreshToken string `json:"refresh_token"`
+ ExpiresIn int `json:"expires_in"`
+}
+
+// Issuer holds information about the issuer of tokens
+type Issuer struct {
+ IssuerEndpoint string
+ ClientID string
+ Audience string
+}
+
+func convertToOAuth2Token(token *TokenResult, clock clock.Clock) oauth2.Token {
+ return oauth2.Token{
+ AccessToken: token.AccessToken,
+ TokenType: "bearer",
+ RefreshToken: token.RefreshToken,
+ Expiry: clock.Now().Add(time.Duration(token.ExpiresIn) * time.Second),
+ }
+}
+
+// ExtractUserName extracts the username claim from an authorization grant
+func ExtractUserName(token oauth2.Token) (string, error) {
+ p := jwt.Parser{}
+ claims := jwt.MapClaims{}
+ if _, _, err := p.ParseUnverified(token.AccessToken, claims); err != nil {
+ return "", fmt.Errorf("unable to decode the access token: %v", err)
+ }
+ username, ok := claims[ClaimNameUserName]
+ if !ok {
+ return "", fmt.Errorf("access token doesn't contain a username claim")
+ }
+ switch v := username.(type) {
+ case string:
+ return v, nil
+ default:
+ return "", fmt.Errorf("access token contains an unsupported username claim")
+ }
+}
diff --git a/oauth2/auth_suite_test.go b/oauth2/auth_suite_test.go
new file mode 100644
index 0000000..95accff
--- /dev/null
+++ b/oauth2/auth_suite_test.go
@@ -0,0 +1,65 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package oauth2
+
+import (
+ "context"
+ "testing"
+
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+)
+
+func TestAuth(t *testing.T) {
+ RegisterFailHandler(Fail)
+ RunSpecs(t, "cloud-cli Auth Suite")
+}
+
+type MockTokenExchanger struct {
+ CalledWithRequest interface{}
+ ReturnsTokens *TokenResult
+ ReturnsError error
+ RefreshCalledWithRequest *RefreshTokenExchangeRequest
+}
+
+func (te *MockTokenExchanger) ExchangeCode(req AuthorizationCodeExchangeRequest) (*TokenResult, error) {
+ te.CalledWithRequest = &req
+ return te.ReturnsTokens, te.ReturnsError
+}
+
+func (te *MockTokenExchanger) ExchangeRefreshToken(req RefreshTokenExchangeRequest) (*TokenResult, error) {
+ te.RefreshCalledWithRequest = &req
+ return te.ReturnsTokens, te.ReturnsError
+}
+
+func (te *MockTokenExchanger) ExchangeClientCredentials(req ClientCredentialsExchangeRequest) (*TokenResult, error) {
+ te.CalledWithRequest = &req
+ return te.ReturnsTokens, te.ReturnsError
+}
+
+func (te *MockTokenExchanger) ExchangeDeviceCode(ctx context.Context,
+ req DeviceCodeExchangeRequest) (*TokenResult, error) {
+ te.CalledWithRequest = &req
+ return te.ReturnsTokens, te.ReturnsError
+}
+
+var oidcEndpoints = OIDCWellKnownEndpoints{
+ AuthorizationEndpoint: "http://issuer/auth/authorize",
+ TokenEndpoint: "http://issuer/auth/token",
+ DeviceAuthorizationEndpoint: "http://issuer/auth/authorize/device",
+}
diff --git a/oauth2/authorization_tokenretriever.go b/oauth2/authorization_tokenretriever.go
new file mode 100644
index 0000000..93c1bfe
--- /dev/null
+++ b/oauth2/authorization_tokenretriever.go
@@ -0,0 +1,338 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package oauth2
+
+import (
+ "context"
+ "encoding/json"
+ "errors"
+ "fmt"
+ "net/http"
+ "net/url"
+ "strconv"
+ "strings"
+ "time"
+)
+
+// TokenRetriever implements AuthTokenExchanger in order to facilitate getting
+// Tokens
+type TokenRetriever struct {
+ transport HTTPAuthTransport
+}
+
+// AuthorizationTokenResponse is the HTTP response when asking for a new token.
+// Note that not all fields will contain data based on what kind of request was
+// sent
+type AuthorizationTokenResponse struct {
+ AccessToken string `json:"access_token"`
+ ExpiresIn int `json:"expires_in"`
+ IDToken string `json:"id_token"`
+ RefreshToken string `json:"refresh_token"`
+ TokenType string `json:"token_type"`
+}
+
+// AuthorizationCodeExchangeRequest is used to request the exchange of an
+// authorization code for a token
+type AuthorizationCodeExchangeRequest struct {
+ TokenEndpoint string
+ ClientID string
+ CodeVerifier string
+ Code string
+ RedirectURI string
+}
+
+// RefreshTokenExchangeRequest is used to request the exchange of a refresh
+// token for a refreshed token
+type RefreshTokenExchangeRequest struct {
+ TokenEndpoint string
+ ClientID string
+ RefreshToken string
+}
+
+// ClientCredentialsExchangeRequest is used to request the exchange of
+// client credentials for a token
+type ClientCredentialsExchangeRequest struct {
+ TokenEndpoint string
+ ClientID string
+ ClientSecret string
+ Audience string
+}
+
+// DeviceCodeExchangeRequest is used to request the exchange of
+// a device code for a token
+type DeviceCodeExchangeRequest struct {
+ TokenEndpoint string
+ ClientID string
+ DeviceCode string
+ PollInterval time.Duration
+}
+
+// TokenErrorResponse is used to parse error responses from the token endpoint
+type TokenErrorResponse struct {
+ Error string `json:"error"`
+ ErrorDescription string `json:"error_description"`
+}
+
+type TokenError struct {
+ ErrorCode string
+ ErrorDescription string
+}
+
+func (e *TokenError) Error() string {
+ if e.ErrorDescription != "" {
+ return fmt.Sprintf("%s (%s)", e.ErrorDescription, e.ErrorCode)
+ }
+ return e.ErrorCode
+}
+
+// HTTPAuthTransport abstracts how an HTTP exchange request is sent and received
+type HTTPAuthTransport interface {
+ Do(request *http.Request) (*http.Response, error)
+}
+
+// NewTokenRetriever allows a TokenRetriever the internal of a new
+// TokenRetriever to be easily set up
+func NewTokenRetriever(authTransport HTTPAuthTransport) *TokenRetriever {
+ return &TokenRetriever{
+ transport: authTransport,
+ }
+}
+
+// newExchangeCodeRequest builds a new AuthTokenRequest wrapped in an
+// http.Request
+func (ce *TokenRetriever) newExchangeCodeRequest(
+ req AuthorizationCodeExchangeRequest) (*http.Request, error) {
+ uv := url.Values{}
+ uv.Set("grant_type", "authorization_code")
+ uv.Set("client_id", req.ClientID)
+ uv.Set("code_verifier", req.CodeVerifier)
+ uv.Set("code", req.Code)
+ uv.Set("redirect_uri", req.RedirectURI)
+
+ euv := uv.Encode()
+
+ request, err := http.NewRequest("POST",
+ req.TokenEndpoint,
+ strings.NewReader(euv),
+ )
+ if err != nil {
+ return nil, err
+ }
+
+ request.Header.Add("Content-Type", "application/x-www-form-urlencoded")
+ request.Header.Add("Content-Length", strconv.Itoa(len(euv)))
+
+ return request, nil
+}
+
+// newDeviceCodeExchangeRequest builds a new DeviceCodeExchangeRequest wrapped in an
+// http.Request
+func (ce *TokenRetriever) newDeviceCodeExchangeRequest(
+ req DeviceCodeExchangeRequest) (*http.Request, error) {
+ uv := url.Values{}
+ uv.Set("grant_type", "urn:ietf:params:oauth:grant-type:device_code")
+ uv.Set("client_id", req.ClientID)
+ uv.Set("device_code", req.DeviceCode)
+ euv := uv.Encode()
+
+ request, err := http.NewRequest("POST",
+ req.TokenEndpoint,
+ strings.NewReader(euv),
+ )
+ if err != nil {
+ return nil, err
+ }
+
+ request.Header.Add("Content-Type", "application/x-www-form-urlencoded")
+ request.Header.Add("Content-Length", strconv.Itoa(len(euv)))
+
+ return request, nil
+}
+
+// newRefreshTokenRequest builds a new RefreshTokenRequest wrapped in an
+// http.Request
+func (ce *TokenRetriever) newRefreshTokenRequest(req RefreshTokenExchangeRequest) (*http.Request, error) {
+ uv := url.Values{}
+ uv.Set("grant_type", "refresh_token")
+ uv.Set("client_id", req.ClientID)
+ uv.Set("refresh_token", req.RefreshToken)
+
+ euv := uv.Encode()
+
+ request, err := http.NewRequest("POST",
+ req.TokenEndpoint,
+ strings.NewReader(euv),
+ )
+ if err != nil {
+ return nil, err
+ }
+
+ request.Header.Add("Content-Type", "application/x-www-form-urlencoded")
+ request.Header.Add("Content-Length", strconv.Itoa(len(euv)))
+
+ return request, nil
+}
+
+// newClientCredentialsRequest builds a new ClientCredentialsExchangeRequest wrapped in an
+// http.Request
+func (ce *TokenRetriever) newClientCredentialsRequest(req ClientCredentialsExchangeRequest) (*http.Request, error) {
+ uv := url.Values{}
+ uv.Set("grant_type", "client_credentials")
+ uv.Set("client_id", req.ClientID)
+ uv.Set("client_secret", req.ClientSecret)
+ uv.Set("audience", req.Audience)
+
+ euv := uv.Encode()
+
+ request, err := http.NewRequest("POST",
+ req.TokenEndpoint,
+ strings.NewReader(euv),
+ )
+ if err != nil {
+ return nil, err
+ }
+
+ request.Header.Add("Content-Type", "application/x-www-form-urlencoded")
+ request.Header.Add("Content-Length", strconv.Itoa(len(euv)))
+
+ return request, nil
+}
+
+// ExchangeCode uses the AuthCodeExchangeRequest to exchange an authorization
+// code for tokens
+func (ce *TokenRetriever) ExchangeCode(req AuthorizationCodeExchangeRequest) (*TokenResult, error) {
+ request, err := ce.newExchangeCodeRequest(req)
+ if err != nil {
+ return nil, err
+ }
+
+ response, err := ce.transport.Do(request)
+ if err != nil {
+ return nil, err
+ }
+
+ return ce.handleAuthTokensResponse(response)
+}
+
+// handleAuthTokensResponse takes care of checking an http.Response that has
+// auth tokens for errors and parsing the raw body to a TokenResult struct
+func (ce *TokenRetriever) handleAuthTokensResponse(resp *http.Response) (*TokenResult, error) {
+ if resp.Body != nil {
+ defer resp.Body.Close()
+ }
+
+ if resp.StatusCode < 200 || resp.StatusCode > 299 {
+ if resp.Header.Get("Content-Type") == "application/json" {
+ er := TokenErrorResponse{}
+ err := json.NewDecoder(resp.Body).Decode(&er)
+ if err != nil {
+ return nil, err
+ }
+ return nil, &TokenError{ErrorCode: er.Error, ErrorDescription: er.ErrorDescription}
+ }
+ return nil, fmt.Errorf("a non-success status code was received: %d", resp.StatusCode)
+ }
+
+ atr := AuthorizationTokenResponse{}
+ err := json.NewDecoder(resp.Body).Decode(&atr)
+ if err != nil {
+ return nil, err
+ }
+
+ return &TokenResult{
+ AccessToken: atr.AccessToken,
+ IDToken: atr.IDToken,
+ RefreshToken: atr.RefreshToken,
+ ExpiresIn: atr.ExpiresIn,
+ }, nil
+}
+
+// ExchangeDeviceCode uses the DeviceCodeExchangeRequest to exchange a device
+// code for tokens
+func (ce *TokenRetriever) ExchangeDeviceCode(ctx context.Context, req DeviceCodeExchangeRequest) (*TokenResult, error) {
+ for {
+ request, err := ce.newDeviceCodeExchangeRequest(req)
+ if err != nil {
+ return nil, err
+ }
+
+ response, err := ce.transport.Do(request)
+ if err != nil {
+ return nil, err
+ }
+ token, err := ce.handleAuthTokensResponse(response)
+ if err == nil {
+ return token, nil
+ }
+ terr, ok := err.(*TokenError)
+ if !ok {
+ return nil, err
+ }
+ switch terr.ErrorCode {
+ case "expired_token":
+ // The user has not authorized the device quickly enough, so the device_code has expired.
+ return nil, fmt.Errorf("the device code has expired")
+ case "access_denied":
+ // The user refused to authorize the device
+ return nil, fmt.Errorf("the device was not authorized")
+ case "authorization_pending":
+ // Still waiting for the user to take action
+ case "slow_down":
+ // You are polling too fast
+ }
+
+ select {
+ case <-time.After(req.PollInterval):
+ continue
+ case <-ctx.Done():
+ return nil, errors.New("cancelled")
+ }
+ }
+}
+
+// ExchangeRefreshToken uses the RefreshTokenExchangeRequest to exchange a
+// refresh token for refreshed tokens
+func (ce *TokenRetriever) ExchangeRefreshToken(req RefreshTokenExchangeRequest) (*TokenResult, error) {
+ request, err := ce.newRefreshTokenRequest(req)
+ if err != nil {
+ return nil, err
+ }
+
+ response, err := ce.transport.Do(request)
+ if err != nil {
+ return nil, err
+ }
+
+ return ce.handleAuthTokensResponse(response)
+}
+
+// ExchangeClientCredentials uses the ClientCredentialsExchangeRequest to exchange
+// client credentials for tokens
+func (ce *TokenRetriever) ExchangeClientCredentials(req ClientCredentialsExchangeRequest) (*TokenResult, error) {
+ request, err := ce.newClientCredentialsRequest(req)
+ if err != nil {
+ return nil, err
+ }
+
+ response, err := ce.transport.Do(request)
+ if err != nil {
+ return nil, err
+ }
+
+ return ce.handleAuthTokensResponse(response)
+}
diff --git a/oauth2/authorization_tokenretriever_test.go b/oauth2/authorization_tokenretriever_test.go
new file mode 100644
index 0000000..b08f878
--- /dev/null
+++ b/oauth2/authorization_tokenretriever_test.go
@@ -0,0 +1,338 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package oauth2
+
+import (
+ "bytes"
+ "context"
+ "encoding/json"
+ "io/ioutil"
+ "net/http"
+ "strings"
+ "time"
+
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+)
+
+type MockTransport struct {
+ Responses []*http.Response
+ ReturnError error
+}
+
+var _ HTTPAuthTransport = &MockTransport{}
+
+func (t *MockTransport) Do(req *http.Request) (*http.Response, error) {
+ if len(t.Responses) > 0 {
+ r := t.Responses[0]
+ t.Responses = t.Responses[1:]
+ return r, nil
+ }
+ return nil, t.ReturnError
+}
+
+var _ = Describe("CodetokenExchanger", func() {
+ Describe("newExchangeCodeRequest", func() {
+ It("creates the request", func() {
+ tokenRetriever := TokenRetriever{}
+ exchangeRequest := AuthorizationCodeExchangeRequest{
+ TokenEndpoint: "https://issuer/oauth/token",
+ ClientID: "clientID",
+ CodeVerifier: "Verifier",
+ Code: "code",
+ RedirectURI: "https://redirect",
+ }
+
+ result, err := tokenRetriever.newExchangeCodeRequest(exchangeRequest)
+
+ result.ParseForm()
+
+ Expect(err).To(BeNil())
+ Expect(result.FormValue("grant_type")).To(Equal("authorization_code"))
+ Expect(result.FormValue("client_id")).To(Equal("clientID"))
+ Expect(result.FormValue("code_verifier")).To(Equal("Verifier"))
+ Expect(result.FormValue("code")).To(Equal("code"))
+ Expect(result.FormValue("redirect_uri")).To(Equal("https://redirect"))
+ Expect(result.URL.String()).To(Equal("https://issuer/oauth/token"))
+
+ Expect(result.Header.Get("Content-Type")).To(Equal("application/x-www-form-urlencoded"))
+ Expect(result.Header.Get("Content-Length")).To(Equal("117"))
+ })
+
+ It("returns an error when NewRequest returns an error", func() {
+ tokenRetriever := TokenRetriever{}
+
+ result, err := tokenRetriever.newExchangeCodeRequest(AuthorizationCodeExchangeRequest{
+ TokenEndpoint: "://issuer/oauth/token",
+ })
+
+ Expect(result).To(BeNil())
+ Expect(err.Error()).To(Equal("parse ://issuer/oauth/token: missing protocol scheme"))
+ })
+ })
+
+ Describe("handleAuthTokensResponse", func() {
+ It("handles the response", func() {
+ tokenRetriever := TokenRetriever{}
+ response := buildResponse(200, AuthorizationTokenResponse{
+ ExpiresIn: 1,
+ AccessToken: "myAccessToken",
+ RefreshToken: "myRefreshToken",
+ })
+
+ result, err := tokenRetriever.handleAuthTokensResponse(response)
+
+ Expect(err).To(BeNil())
+ Expect(result).To(Equal(&TokenResult{
+ ExpiresIn: 1,
+ AccessToken: "myAccessToken",
+ RefreshToken: "myRefreshToken",
+ }))
+ })
+
+ It("returns error when status code is not successful", func() {
+ tokenRetriever := TokenRetriever{}
+ response := buildResponse(500, nil)
+
+ result, err := tokenRetriever.handleAuthTokensResponse(response)
+
+ Expect(result).To(BeNil())
+ Expect(err.Error()).To(Equal("a non-success status code was received: 500"))
+ })
+
+ It("returns typed error when response body contains error information", func() {
+ errorBody := TokenErrorResponse{Error: "test", ErrorDescription: "test description"}
+ tokenRetriever := TokenRetriever{}
+ response := buildResponse(400, errorBody)
+
+ result, err := tokenRetriever.handleAuthTokensResponse(response)
+
+ Expect(result).To(BeNil())
+ Expect(err).To(Equal(&TokenError{ErrorCode: "test", ErrorDescription: "test description"}))
+ Expect(err.Error()).To(Equal("test description (test)"))
+ })
+
+ It("returns error when deserialization fails", func() {
+ tokenRetriever := TokenRetriever{}
+ response := buildResponse(200, "")
+
+ result, err := tokenRetriever.handleAuthTokensResponse(response)
+ Expect(result).To(BeNil())
+ Expect(err.Error()).To(Equal(
+ "json: cannot unmarshal string into Go value of type oauth2.AuthorizationTokenResponse"))
+ })
+ })
+
+ Describe("newRefreshTokenRequest", func() {
+ It("creates the request", func() {
+ tokenRetriever := TokenRetriever{}
+ exchangeRequest := RefreshTokenExchangeRequest{
+ TokenEndpoint: "https://issuer/oauth/token",
+ ClientID: "clientID",
+ RefreshToken: "refreshToken",
+ }
+
+ result, err := tokenRetriever.newRefreshTokenRequest(exchangeRequest)
+
+ result.ParseForm()
+
+ Expect(err).To(BeNil())
+ Expect(result.FormValue("grant_type")).To(Equal("refresh_token"))
+ Expect(result.FormValue("client_id")).To(Equal("clientID"))
+ Expect(result.FormValue("refresh_token")).To(Equal("refreshToken"))
+ Expect(result.URL.String()).To(Equal("https://issuer/oauth/token"))
+
+ Expect(result.Header.Get("Content-Type")).To(Equal("application/x-www-form-urlencoded"))
+ Expect(result.Header.Get("Content-Length")).To(Equal("70"))
+ })
+
+ It("returns an error when NewRequest returns an error", func() {
+ tokenRetriever := TokenRetriever{}
+
+ result, err := tokenRetriever.newRefreshTokenRequest(RefreshTokenExchangeRequest{
+ TokenEndpoint: "://issuer/oauth/token",
+ })
+
+ Expect(result).To(BeNil())
+ Expect(err.Error()).To(Equal("parse ://issuer/oauth/token: missing protocol scheme"))
+ })
+ })
+
+ Describe("newClientCredentialsRequest", func() {
+ It("creates the request", func() {
+ tokenRetriever := TokenRetriever{}
+ exchangeRequest := ClientCredentialsExchangeRequest{
+ TokenEndpoint: "https://issuer/oauth/token",
+ ClientID: "clientID",
+ ClientSecret: "clientSecret",
+ Audience: "audience",
+ }
+
+ result, err := tokenRetriever.newClientCredentialsRequest(exchangeRequest)
+
+ result.ParseForm()
+
+ Expect(err).To(BeNil())
+ Expect(result.FormValue("grant_type")).To(Equal("client_credentials"))
+ Expect(result.FormValue("client_id")).To(Equal("clientID"))
+ Expect(result.FormValue("client_secret")).To(Equal("clientSecret"))
+ Expect(result.FormValue("audience")).To(Equal("audience"))
+ Expect(result.URL.String()).To(Equal("https://issuer/oauth/token"))
+
+ Expect(result.Header.Get("Content-Type")).To(Equal("application/x-www-form-urlencoded"))
+ Expect(result.Header.Get("Content-Length")).To(Equal("93"))
+ })
+
+ It("returns an error when NewRequest returns an error", func() {
+ tokenRetriever := TokenRetriever{}
+
+ result, err := tokenRetriever.newClientCredentialsRequest(ClientCredentialsExchangeRequest{
+ TokenEndpoint: "://issuer/oauth/token",
+ })
+
+ Expect(result).To(BeNil())
+ Expect(err.Error()).To(Equal("parse ://issuer/oauth/token: missing protocol scheme"))
+ })
+ })
+
+ Describe("newDeviceCodeExchangeRequest", func() {
+ It("creates the request", func() {
+ tokenRetriever := TokenRetriever{}
+ exchangeRequest := DeviceCodeExchangeRequest{
+ TokenEndpoint: "https://issuer/oauth/token",
+ ClientID: "clientID",
+ DeviceCode: "deviceCode",
+ PollInterval: time.Duration(5) * time.Second,
+ }
+
+ result, err := tokenRetriever.newDeviceCodeExchangeRequest(exchangeRequest)
+
+ result.ParseForm()
+
+ Expect(err).To(BeNil())
+ Expect(result.FormValue("grant_type")).To(Equal("urn:ietf:params:oauth:grant-type:device_code"))
+ Expect(result.FormValue("client_id")).To(Equal("clientID"))
+ Expect(result.FormValue("device_code")).To(Equal("deviceCode"))
+ Expect(result.URL.String()).To(Equal("https://issuer/oauth/token"))
+
+ Expect(result.Header.Get("Content-Type")).To(Equal("application/x-www-form-urlencoded"))
+ Expect(result.Header.Get("Content-Length")).To(Equal("107"))
+ })
+
+ It("returns an error when NewRequest returns an error", func() {
+ tokenRetriever := TokenRetriever{}
+
+ result, err := tokenRetriever.newClientCredentialsRequest(ClientCredentialsExchangeRequest{
+ TokenEndpoint: "://issuer/oauth/token",
+ })
+
+ Expect(result).To(BeNil())
+ Expect(err.Error()).To(Equal("parse ://issuer/oauth/token: missing protocol scheme"))
+ })
+ })
+
+ Describe("ExchangeDeviceCode", func() {
+ var mockTransport *MockTransport
+ var tokenRetriever *TokenRetriever
+ var exchangeRequest DeviceCodeExchangeRequest
+ var tokenResult TokenResult
+
+ BeforeEach(func() {
+ mockTransport = &MockTransport{}
+ tokenRetriever = &TokenRetriever{
+ transport: mockTransport,
+ }
+ exchangeRequest = DeviceCodeExchangeRequest{
+ TokenEndpoint: "https://issuer/oauth/token",
+ ClientID: "clientID",
+ DeviceCode: "deviceCode",
+ PollInterval: time.Duration(1) * time.Second,
+ }
+ tokenResult = TokenResult{
+ ExpiresIn: 1,
+ AccessToken: "myAccessToken",
+ RefreshToken: "myRefreshToken",
+ }
+ })
+
+ It("returns a token", func() {
+ })
+
+ It("supports cancellation", func() {
+ mockTransport.Responses = []*http.Response{
+ buildResponse(400, &TokenErrorResponse{"authorization_pending", ""}),
+ }
+ ctx, cancel := context.WithCancel(context.Background())
+ cancel()
+ _, err := tokenRetriever.ExchangeDeviceCode(ctx, exchangeRequest)
+ Expect(err).ToNot(BeNil())
+ Expect(err.Error()).To(Equal("cancelled"))
+ })
+
+ It("implements authorization_pending and slow_down", func() {
+ startTime := time.Now()
+ mockTransport.Responses = []*http.Response{
+ buildResponse(400, &TokenErrorResponse{"authorization_pending", ""}),
+ buildResponse(400, &TokenErrorResponse{"authorization_pending", ""}),
+ buildResponse(400, &TokenErrorResponse{"slow_down", ""}),
+ buildResponse(200, &tokenResult),
+ }
+ token, err := tokenRetriever.ExchangeDeviceCode(context.Background(), exchangeRequest)
+ Expect(err).To(BeNil())
+ Expect(token).To(Equal(&tokenResult))
+ endTime := time.Now()
+ Expect(endTime.Sub(startTime)).To(BeNumerically(">", exchangeRequest.PollInterval*3))
+ })
+
+ It("implements expired_token", func() {
+ mockTransport.Responses = []*http.Response{
+ buildResponse(400, &TokenErrorResponse{"expired_token", ""}),
+ }
+ _, err := tokenRetriever.ExchangeDeviceCode(context.Background(), exchangeRequest)
+ Expect(err).ToNot(BeNil())
+ Expect(err.Error()).To(Equal("the device code has expired"))
+ })
+
+ It("implements access_denied", func() {
+ mockTransport.Responses = []*http.Response{
+ buildResponse(400, &TokenErrorResponse{"access_denied", ""}),
+ }
+ _, err := tokenRetriever.ExchangeDeviceCode(context.Background(), exchangeRequest)
+ Expect(err).ToNot(BeNil())
+ Expect(err.Error()).To(Equal("the device was not authorized"))
+ })
+ })
+})
+
+func buildResponse(statusCode int, body interface{}) *http.Response {
+ b, err := json.Marshal(body)
+ if err != nil {
+ panic(err)
+ }
+
+ resp := &http.Response{
+ StatusCode: statusCode,
+ Header: map[string][]string{},
+ Body: ioutil.NopCloser(bytes.NewReader(b)),
+ }
+ if strings.HasPrefix(string(b), "{") {
+ resp.Header.Add("Content-Type", "application/json")
+ }
+
+ return resp
+}
diff --git a/oauth2/cache/cache.go b/oauth2/cache/cache.go
new file mode 100644
index 0000000..3d90bfa
--- /dev/null
+++ b/oauth2/cache/cache.go
@@ -0,0 +1,142 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package cache
+
+import (
+ "fmt"
+ "sync"
+ "time"
+
+ "github.com/apache/pulsar-client-go/oauth2"
+ "github.com/apache/pulsar-client-go/oauth2/store"
+
+ xoauth2 "golang.org/x/oauth2"
+ "github.com/apache/pulsar-client-go/oauth2/clock"
+)
+
+// A CachingTokenSource is anything that can return a token, and is backed by a cache.
+type CachingTokenSource interface {
+ xoauth2.TokenSource
+
+ // InvalidateToken is called when the token is rejected by the resource server.
+ InvalidateToken() error
+}
+
+const (
+ // expiryDelta adjusts the token TTL to avoid using tokens which are almost expired
+ expiryDelta = time.Duration(60) * time.Second
+)
+
+// tokenCache implements a cache for the token associated with a specific audience.
+// it interacts with the store when the access token is near expiration or invalidated.
+// it is advisable to use a token cache instance per audience.
+type tokenCache struct {
+ clock clock.Clock
+ lock sync.Mutex
+ store store.Store
+ audience string
+ refresher oauth2.AuthorizationGrantRefresher
+ token *xoauth2.Token
+}
+
+func NewDefaultTokenCache(store store.Store, audience string,
+ refresher oauth2.AuthorizationGrantRefresher) (CachingTokenSource, error) {
+ cache := &tokenCache{
+ clock: clock.RealClock{},
+ store: store,
+ audience: audience,
+ refresher: refresher,
+ }
+ return cache, nil
+}
+
+var _ CachingTokenSource = &tokenCache{}
+
+// Token returns a valid access token, if available.
+func (t *tokenCache) Token() (*xoauth2.Token, error) {
+ t.lock.Lock()
+ defer t.lock.Unlock()
+
+ // use the cached access token if it isn't expired
+ if t.token != nil && t.validateAccessToken(*t.token) {
+ return t.token, nil
+ }
+
+ // load from the store and use the access token if it isn't expired
+ grant, err := t.store.LoadGrant(t.audience)
+ if err != nil {
+ return nil, fmt.Errorf("LoadGrant: %v", err)
+ }
+ t.token = grant.Token
+ if t.token != nil && t.validateAccessToken(*t.token) {
+ return t.token, nil
+ }
+
+ // obtain and cache a fresh access token
+ grant, err = t.refresher.Refresh(grant)
+ if err != nil {
+ return nil, fmt.Errorf("RefreshGrant: %v", err)
+ }
+ t.token = grant.Token
+ err = t.store.SaveGrant(t.audience, *grant)
+ if err != nil {
+ // TODO log rather than throw
+ return nil, fmt.Errorf("SaveGrant: %v", err)
+ }
+
+ return t.token, nil
+}
+
+// InvalidateToken clears the access token (likely due to a response from the resource server).
+// Note that the token within the grant may contain a refresh token which should survive.
+func (t *tokenCache) InvalidateToken() error {
+ t.lock.Lock()
+ defer t.lock.Unlock()
+
+ previous := t.token
+ t.token = nil
+
+ // clear from the store the access token that was returned earlier (unless the store has since been updated)
+ if previous == nil || previous.AccessToken == "" {
+ return nil
+ }
+ grant, err := t.store.LoadGrant(t.audience)
+ if err != nil {
+ return fmt.Errorf("LoadGrant: %v", err)
+ }
+ if grant.Token != nil && grant.Token.AccessToken == previous.AccessToken {
+ grant.Token.Expiry = time.Unix(0, 0).Add(expiryDelta)
+ err = t.store.SaveGrant(t.audience, *grant)
+ if err != nil {
+ // TODO log rather than throw
+ return fmt.Errorf("SaveGrant: %v", err)
+ }
+ }
+ return nil
+}
+
+// validateAccessToken checks the validity of the cached access token
+func (t *tokenCache) validateAccessToken(token xoauth2.Token) bool {
+ if token.AccessToken == "" {
+ return false
+ }
+ if !token.Expiry.IsZero() && t.clock.Now().After(token.Expiry.Round(0).Add(-expiryDelta)) {
+ return false
+ }
+ return true
+}
diff --git a/oauth2/client_credentials_flow.go b/oauth2/client_credentials_flow.go
new file mode 100644
index 0000000..808b09b
--- /dev/null
+++ b/oauth2/client_credentials_flow.go
@@ -0,0 +1,158 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package oauth2
+
+import (
+ "net/http"
+
+ "github.com/apache/pulsar-client-go/oauth2/clock"
+
+ "github.com/pkg/errors"
+)
+
+// ClientCredentialsFlow takes care of the mechanics needed for getting an access
+// token using the OAuth 2.0 "Client Credentials Flow"
+type ClientCredentialsFlow struct {
+ options ClientCredentialsFlowOptions
+ oidcWellKnownEndpoints OIDCWellKnownEndpoints
+ keyfile *KeyFile
+ exchanger ClientCredentialsExchanger
+ clock clock.Clock
+}
+
+// ClientCredentialsProvider abstracts getting client credentials
+type ClientCredentialsProvider interface {
+ GetClientCredentials() (*KeyFile, error)
+}
+
+// ClientCredentialsExchanger abstracts exchanging client credentials for tokens
+type ClientCredentialsExchanger interface {
+ ExchangeClientCredentials(req ClientCredentialsExchangeRequest) (*TokenResult, error)
+}
+
+type ClientCredentialsFlowOptions struct {
+ KeyFile string
+ AdditionalScopes []string
+}
+
+func newClientCredentialsFlow(
+ options ClientCredentialsFlowOptions,
+ keyfile *KeyFile,
+ oidcWellKnownEndpoints OIDCWellKnownEndpoints,
+ exchanger ClientCredentialsExchanger,
+ clock clock.Clock) *ClientCredentialsFlow {
+ return &ClientCredentialsFlow{
+ options: options,
+ oidcWellKnownEndpoints: oidcWellKnownEndpoints,
+ keyfile: keyfile,
+ exchanger: exchanger,
+ clock: clock,
+ }
+}
+
+// NewDefaultClientCredentialsFlow provides an easy way to build up a default
+// client credentials flow with all the correct configuration.
+func NewDefaultClientCredentialsFlow(options ClientCredentialsFlowOptions) (*ClientCredentialsFlow, error) {
+
+ credsProvider := NewClientCredentialsProviderFromKeyFile(options.KeyFile)
+ keyFile, err := credsProvider.GetClientCredentials()
+ if err != nil {
+ return nil, errors.Wrap(err, "could not get client credentials")
+ }
+
+ wellKnownEndpoints, err := GetOIDCWellKnownEndpointsFromIssuerURL(keyFile.IssuerURL)
+ if err != nil {
+ return nil, err
+ }
+
+ tokenRetriever := NewTokenRetriever(&http.Client{})
+
+ return newClientCredentialsFlow(
+ options,
+ keyFile,
+ *wellKnownEndpoints,
+ tokenRetriever,
+ clock.RealClock{}), nil
+}
+
+var _ Flow = &ClientCredentialsFlow{}
+
+func (c *ClientCredentialsFlow) Authorize(audience string) (*AuthorizationGrant, error) {
+ var err error
+ grant := &AuthorizationGrant{
+ Type: GrantTypeClientCredentials,
+ Audience: audience,
+ ClientID: c.keyfile.ClientID,
+ ClientCredentials: c.keyfile,
+ TokenEndpoint: c.oidcWellKnownEndpoints.TokenEndpoint,
+ }
+
+ // test the credentials and obtain an initial access token
+ refresher := &ClientCredentialsGrantRefresher{
+ exchanger: c.exchanger,
+ clock: c.clock,
+ }
+ grant, err = refresher.Refresh(grant)
+ if err != nil {
+ return nil, errors.Wrap(err, "authentication failed using client credentials")
+ }
+ return grant, nil
+}
+
+type ClientCredentialsGrantRefresher struct {
+ exchanger ClientCredentialsExchanger
+ clock clock.Clock
+}
+
+func NewDefaultClientCredentialsGrantRefresher(clock clock.Clock) (*ClientCredentialsGrantRefresher, error) {
+ tokenRetriever := NewTokenRetriever(&http.Client{})
+ return &ClientCredentialsGrantRefresher{
+ exchanger: tokenRetriever,
+ clock: clock,
+ }, nil
+}
+
+var _ AuthorizationGrantRefresher = &ClientCredentialsGrantRefresher{}
+
+func (g *ClientCredentialsGrantRefresher) Refresh(grant *AuthorizationGrant) (*AuthorizationGrant, error) {
+ if grant.Type != GrantTypeClientCredentials {
+ return nil, errors.New("unsupported grant type")
+ }
+
+ exchangeRequest := ClientCredentialsExchangeRequest{
+ TokenEndpoint: grant.TokenEndpoint,
+ Audience: grant.Audience,
+ ClientID: grant.ClientCredentials.ClientID,
+ ClientSecret: grant.ClientCredentials.ClientSecret,
+ }
+ tr, err := g.exchanger.ExchangeClientCredentials(exchangeRequest)
+ if err != nil {
+ return nil, errors.Wrap(err, "could not exchange client credentials")
+ }
+
+ token := convertToOAuth2Token(tr, g.clock)
+ grant = &AuthorizationGrant{
+ Type: GrantTypeClientCredentials,
+ Audience: grant.Audience,
+ ClientID: grant.ClientID,
+ ClientCredentials: grant.ClientCredentials,
+ TokenEndpoint: grant.TokenEndpoint,
+ Token: &token,
+ }
+ return grant, nil
+}
diff --git a/oauth2/client_credentials_flow_test.go b/oauth2/client_credentials_flow_test.go
new file mode 100644
index 0000000..6e123ce
--- /dev/null
+++ b/oauth2/client_credentials_flow_test.go
@@ -0,0 +1,183 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package oauth2
+
+import (
+ "errors"
+ "time"
+
+ "github.com/apache/pulsar-client-go/oauth2/clock"
+ "github.com/apache/pulsar-client-go/oauth2/clock/testing"
+
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+)
+
+type MockClientCredentialsProvider struct {
+ Called bool
+ ClientCredentialsResult *KeyFile
+ ReturnsError error
+}
+
+func (m *MockClientCredentialsProvider) GetClientCredentials() (*KeyFile, error) {
+ m.Called = true
+ return m.ClientCredentialsResult, m.ReturnsError
+}
+
+var _ ClientCredentialsProvider = &MockClientCredentialsProvider{}
+
+var clientCredentials = KeyFile{
+ Type: KeyFileTypeServiceAccount,
+ ClientID: "test_clientID",
+ ClientSecret: "test_clientSecret",
+ ClientEmail: "test_clientEmail",
+ IssuerURL: "http://issuer",
+}
+
+var _ = Describe("ClientCredentialsFlow", func() {
+ Describe("Authorize", func() {
+
+ var mockClock clock.Clock
+ var mockTokenExchanger *MockTokenExchanger
+
+ BeforeEach(func() {
+ mockClock = testing.NewFakeClock(time.Unix(0, 0))
+ expectedTokens := TokenResult{AccessToken: "accessToken", RefreshToken: "refreshToken", ExpiresIn: 1234}
+ mockTokenExchanger = &MockTokenExchanger{
+ ReturnsTokens: &expectedTokens,
+ }
+ })
+
+ It("invokes TokenExchanger with credentials", func() {
+ provider := newClientCredentialsFlow(
+ ClientCredentialsFlowOptions{
+ KeyFile: "test_keyfile",
+ },
+ &clientCredentials,
+ oidcEndpoints,
+ mockTokenExchanger,
+ mockClock,
+ )
+
+ _, err := provider.Authorize("test_audience")
+ Expect(err).ToNot(HaveOccurred())
+ Expect(mockTokenExchanger.CalledWithRequest).To(Equal(&ClientCredentialsExchangeRequest{
+ TokenEndpoint: oidcEndpoints.TokenEndpoint,
+ ClientID: clientCredentials.ClientID,
+ ClientSecret: clientCredentials.ClientSecret,
+ Audience: "test_audience",
+ }))
+ })
+
+ It("returns TokensResult from TokenExchanger", func() {
+ provider := newClientCredentialsFlow(
+ ClientCredentialsFlowOptions{
+ KeyFile: "test_keyfile",
+ },
+ &clientCredentials,
+ oidcEndpoints,
+ mockTokenExchanger,
+ mockClock,
+ )
+
+ grant, err := provider.Authorize("test_audience")
+ Expect(err).ToNot(HaveOccurred())
+ expected := convertToOAuth2Token(mockTokenExchanger.ReturnsTokens, mockClock)
+ Expect(*grant.Token).To(Equal(expected))
+ })
+
+ It("returns an error if token exchanger errors", func() {
+ mockTokenExchanger.ReturnsError = errors.New("someerror")
+ mockTokenExchanger.ReturnsTokens = nil
+
+ provider := newClientCredentialsFlow(
+ ClientCredentialsFlowOptions{
+ KeyFile: "test_keyfile",
+ },
+ &clientCredentials,
+ oidcEndpoints,
+ mockTokenExchanger,
+ mockClock,
+ )
+
+ _, err := provider.Authorize("test_audience")
+ Expect(err.Error()).To(Equal("authentication failed using client credentials: " +
+ "could not exchange client credentials: someerror"))
+ })
+ })
+})
+
+var _ = Describe("ClientCredentialsGrantRefresher", func() {
+
+ Describe("Refresh", func() {
+ var mockClock clock.Clock
+ var mockTokenExchanger *MockTokenExchanger
+
+ BeforeEach(func() {
+ mockClock = testing.NewFakeClock(time.Unix(0, 0))
+ expectedTokens := TokenResult{AccessToken: "accessToken", RefreshToken: "refreshToken", ExpiresIn: 1234}
+ mockTokenExchanger = &MockTokenExchanger{
+ ReturnsTokens: &expectedTokens,
+ }
+ })
+
+ It("invokes TokenExchanger with credentials", func() {
+ refresher := &ClientCredentialsGrantRefresher{
+ clock: mockClock,
+ exchanger: mockTokenExchanger,
+ }
+ og := &AuthorizationGrant{
+ Type: GrantTypeClientCredentials,
+ Audience: "test_audience",
+ ClientCredentials: &clientCredentials,
+ TokenEndpoint: oidcEndpoints.TokenEndpoint,
+ Token: nil,
+ }
+ _, err := refresher.Refresh(og)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(mockTokenExchanger.CalledWithRequest).To(Equal(&ClientCredentialsExchangeRequest{
+ TokenEndpoint: oidcEndpoints.TokenEndpoint,
+ ClientID: clientCredentials.ClientID,
+ ClientSecret: clientCredentials.ClientSecret,
+ Audience: og.Audience,
+ }))
+ })
+
+ It("returns a valid grant", func() {
+ refresher := &ClientCredentialsGrantRefresher{
+ clock: mockClock,
+ exchanger: mockTokenExchanger,
+ }
+ og := &AuthorizationGrant{
+ Type: GrantTypeClientCredentials,
+ Audience: "test_audience",
+ ClientCredentials: &clientCredentials,
+ TokenEndpoint: oidcEndpoints.TokenEndpoint,
+ Token: nil,
+ }
+ ng, err := refresher.Refresh(og)
+ Expect(err).ToNot(HaveOccurred())
+ Expect(ng.Audience).To(Equal("test_audience"))
+ Expect(ng.ClientID).To(Equal(""))
+ Expect(*ng.ClientCredentials).To(Equal(clientCredentials))
+ Expect(ng.TokenEndpoint).To(Equal(oidcEndpoints.TokenEndpoint))
+ expected := convertToOAuth2Token(mockTokenExchanger.ReturnsTokens, mockClock)
+ Expect(*ng.Token).To(Equal(expected))
+ })
+ })
+})
diff --git a/oauth2/client_credentials_provider.go b/oauth2/client_credentials_provider.go
new file mode 100644
index 0000000..2716376
--- /dev/null
+++ b/oauth2/client_credentials_provider.go
@@ -0,0 +1,66 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package oauth2
+
+import (
+ "encoding/json"
+ "fmt"
+ "io/ioutil"
+)
+
+const (
+ KeyFileTypeServiceAccount = "sn_service_account"
+)
+
+type KeyFileProvider struct {
+ KeyFile string
+}
+
+type KeyFile struct {
+ Type string `json:"type"`
+ ClientID string `json:"client_id"`
+ ClientSecret string `json:"client_secret"`
+ ClientEmail string `json:"client_email"`
+ IssuerURL string `json:"issuer_url"`
+}
+
+func NewClientCredentialsProviderFromKeyFile(keyFile string) *KeyFileProvider {
+ return &KeyFileProvider{
+ KeyFile: keyFile,
+ }
+}
+
+var _ ClientCredentialsProvider = &KeyFileProvider{}
+
+func (k *KeyFileProvider) GetClientCredentials() (*KeyFile, error) {
+ keyFile, err := ioutil.ReadFile(k.KeyFile)
+ if err != nil {
+ return nil, err
+ }
+
+ var v KeyFile
+ err = json.Unmarshal(keyFile, &v)
+ if err != nil {
+ return nil, err
+ }
+ if v.Type != KeyFileTypeServiceAccount {
+ return nil, fmt.Errorf("open %s: unsupported format", k.KeyFile)
+ }
+
+ return &v, nil
+}
diff --git a/oauth2/clock/clock.go b/oauth2/clock/clock.go
new file mode 100644
index 0000000..f170d9a
--- /dev/null
+++ b/oauth2/clock/clock.go
@@ -0,0 +1,98 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package clock
+
+import "time"
+
+// Clock allows for injecting fake or real clocks into code that
+// needs to do arbitrary things based on time.
+type Clock interface {
+ Now() time.Time
+ Since(time.Time) time.Duration
+ After(d time.Duration) <-chan time.Time
+ NewTimer(d time.Duration) Timer
+ Sleep(d time.Duration)
+ Tick(d time.Duration) <-chan time.Time
+}
+
+var _ = Clock(RealClock{})
+
+// RealClock really calls time.Now()
+type RealClock struct{}
+
+// Now returns the current time.
+func (RealClock) Now() time.Time {
+ return time.Now()
+}
+
+// Since returns time since the specified timestamp.
+func (RealClock) Since(ts time.Time) time.Duration {
+ return time.Since(ts)
+}
+
+// After is the same as time.After(d).
+func (RealClock) After(d time.Duration) <-chan time.Time {
+ return time.After(d)
+}
+
+// NewTimer is the same as time.NewTimer(d)
+func (RealClock) NewTimer(d time.Duration) Timer {
+ return &realTimer{
+ timer: time.NewTimer(d),
+ }
+}
+
+// Tick is the same as time.Tick(d)
+func (RealClock) Tick(d time.Duration) <-chan time.Time {
+ return time.Tick(d)
+}
+
+// Sleep is the same as time.Sleep(d)
+func (RealClock) Sleep(d time.Duration) {
+ time.Sleep(d)
+}
+
+// Timer allows for injecting fake or real timers into code that
+// needs to do arbitrary things based on time.
+type Timer interface {
+ C() <-chan time.Time
+ Stop() bool
+ Reset(d time.Duration) bool
+}
+
+var _ = Timer(&realTimer{})
+
+// realTimer is backed by an actual time.Timer.
+type realTimer struct {
+ timer *time.Timer
+}
+
+// C returns the underlying timer's channel.
+func (r *realTimer) C() <-chan time.Time {
+ return r.timer.C
+}
+
+// Stop calls Stop() on the underlying timer.
+func (r *realTimer) Stop() bool {
+ return r.timer.Stop()
+}
+
+// Reset calls Reset() on the underlying timer.
+func (r *realTimer) Reset(d time.Duration) bool {
+ return r.timer.Reset(d)
+}
diff --git a/oauth2/clock/testing/fake_clock.go b/oauth2/clock/testing/fake_clock.go
new file mode 100644
index 0000000..6fcbf4c
--- /dev/null
+++ b/oauth2/clock/testing/fake_clock.go
@@ -0,0 +1,275 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package testing
+
+import (
+ "sync"
+ "time"
+
+ "github.com/apache/pulsar-client-go/oauth2/clock"
+)
+
+var (
+ _ = clock.Clock(&FakeClock{})
+ _ = clock.Clock(&IntervalClock{})
+)
+
+// FakeClock implements clock.Clock, but returns an arbitrary time.
+type FakeClock struct {
+ lock sync.RWMutex
+ time time.Time
+
+ // waiters are waiting for the fake time to pass their specified time
+ waiters []*fakeClockWaiter
+}
+
+type fakeClockWaiter struct {
+ targetTime time.Time
+ stepInterval time.Duration
+ skipIfBlocked bool
+ destChan chan time.Time
+ fired bool
+}
+
+// NewFakeClock constructs a fake clock set to the provided time.
+func NewFakeClock(t time.Time) *FakeClock {
+ return &FakeClock{
+ time: t,
+ }
+}
+
+// Now returns f's time.
+func (f *FakeClock) Now() time.Time {
+ f.lock.RLock()
+ defer f.lock.RUnlock()
+ return f.time
+}
+
+// Since returns time since the time in f.
+func (f *FakeClock) Since(ts time.Time) time.Duration {
+ f.lock.RLock()
+ defer f.lock.RUnlock()
+ return f.time.Sub(ts)
+}
+
+// After is the fake version of time.After(d).
+func (f *FakeClock) After(d time.Duration) <-chan time.Time {
+ f.lock.Lock()
+ defer f.lock.Unlock()
+ stopTime := f.time.Add(d)
+ ch := make(chan time.Time, 1) // Don't block!
+ f.waiters = append(f.waiters, &fakeClockWaiter{
+ targetTime: stopTime,
+ destChan: ch,
+ })
+ return ch
+}
+
+// NewTimer constructs a fake timer, akin to time.NewTimer(d).
+func (f *FakeClock) NewTimer(d time.Duration) clock.Timer {
+ f.lock.Lock()
+ defer f.lock.Unlock()
+ stopTime := f.time.Add(d)
+ ch := make(chan time.Time, 1) // Don't block!
+ timer := &fakeTimer{
+ fakeClock: f,
+ waiter: fakeClockWaiter{
+ targetTime: stopTime,
+ destChan: ch,
+ },
+ }
+ f.waiters = append(f.waiters, &timer.waiter)
+ return timer
+}
+
+// Tick constructs a fake ticker, akin to time.Tick
+func (f *FakeClock) Tick(d time.Duration) <-chan time.Time {
+ if d <= 0 {
+ return nil
+ }
+ f.lock.Lock()
+ defer f.lock.Unlock()
+ tickTime := f.time.Add(d)
+ ch := make(chan time.Time, 1) // hold one tick
+ f.waiters = append(f.waiters, &fakeClockWaiter{
+ targetTime: tickTime,
+ stepInterval: d,
+ skipIfBlocked: true,
+ destChan: ch,
+ })
+
+ return ch
+}
+
+// Step moves the clock by Duration and notifies anyone that's called After,
+// Tick, or NewTimer.
+func (f *FakeClock) Step(d time.Duration) {
+ f.lock.Lock()
+ defer f.lock.Unlock()
+ f.setTimeLocked(f.time.Add(d))
+}
+
+// SetTime sets the time.
+func (f *FakeClock) SetTime(t time.Time) {
+ f.lock.Lock()
+ defer f.lock.Unlock()
+ f.setTimeLocked(t)
+}
+
+// Actually changes the time and checks any waiters. f must be write-locked.
+func (f *FakeClock) setTimeLocked(t time.Time) {
+ f.time = t
+ newWaiters := make([]*fakeClockWaiter, 0, len(f.waiters))
+ for i := range f.waiters {
+ w := f.waiters[i]
+ if !w.targetTime.After(t) {
+
+ if w.skipIfBlocked {
+ select {
+ case w.destChan <- t:
+ w.fired = true
+ default:
+ }
+ } else {
+ w.destChan <- t
+ w.fired = true
+ }
+
+ if w.stepInterval > 0 {
+ for !w.targetTime.After(t) {
+ w.targetTime = w.targetTime.Add(w.stepInterval)
+ }
+ newWaiters = append(newWaiters, w)
+ }
+
+ } else {
+ newWaiters = append(newWaiters, f.waiters[i])
+ }
+ }
+ f.waiters = newWaiters
+}
+
+// HasWaiters returns true if After has been called on f but not yet satisfied (so you can
+// write race-free tests).
+func (f *FakeClock) HasWaiters() bool {
+ f.lock.RLock()
+ defer f.lock.RUnlock()
+ return len(f.waiters) > 0
+}
+
+// Sleep is akin to time.Sleep
+func (f *FakeClock) Sleep(d time.Duration) {
+ f.Step(d)
+}
+
+// IntervalClock implements clock.Clock, but each invocation of Now steps the clock forward the specified duration
+type IntervalClock struct {
+ Time time.Time
+ Duration time.Duration
+}
+
+// Now returns i's time.
+func (i *IntervalClock) Now() time.Time {
+ i.Time = i.Time.Add(i.Duration)
+ return i.Time
+}
+
+// Since returns time since the time in i.
+func (i *IntervalClock) Since(ts time.Time) time.Duration {
+ return i.Time.Sub(ts)
+}
+
+// After is unimplemented, will panic.
+// TODO: make interval clock use FakeClock so this can be implemented.
+func (*IntervalClock) After(d time.Duration) <-chan time.Time {
+ panic("IntervalClock doesn't implement After")
+}
+
+// NewTimer is unimplemented, will panic.
+// TODO: make interval clock use FakeClock so this can be implemented.
+func (*IntervalClock) NewTimer(d time.Duration) clock.Timer {
+ panic("IntervalClock doesn't implement NewTimer")
+}
+
+// Tick is unimplemented, will panic.
+// TODO: make interval clock use FakeClock so this can be implemented.
+func (*IntervalClock) Tick(d time.Duration) <-chan time.Time {
+ panic("IntervalClock doesn't implement Tick")
+}
+
+// Sleep is unimplemented, will panic.
+func (*IntervalClock) Sleep(d time.Duration) {
+ panic("IntervalClock doesn't implement Sleep")
+}
+
+var _ = clock.Timer(&fakeTimer{})
+
+// fakeTimer implements clock.Timer based on a FakeClock.
+type fakeTimer struct {
+ fakeClock *FakeClock
+ waiter fakeClockWaiter
+}
+
+// C returns the channel that notifies when this timer has fired.
+func (f *fakeTimer) C() <-chan time.Time {
+ return f.waiter.destChan
+}
+
+// Stop stops the timer and returns true if the timer has not yet fired, or false otherwise.
+func (f *fakeTimer) Stop() bool {
+ f.fakeClock.lock.Lock()
+ defer f.fakeClock.lock.Unlock()
+
+ newWaiters := make([]*fakeClockWaiter, 0, len(f.fakeClock.waiters))
+ for i := range f.fakeClock.waiters {
+ w := f.fakeClock.waiters[i]
+ if w != &f.waiter {
+ newWaiters = append(newWaiters, w)
+ }
+ }
+
+ f.fakeClock.waiters = newWaiters
+
+ return !f.waiter.fired
+}
+
+// Reset resets the timer to the fake clock's "now" + d. It returns true if the timer has not yet
+// fired, or false otherwise.
+func (f *fakeTimer) Reset(d time.Duration) bool {
+ f.fakeClock.lock.Lock()
+ defer f.fakeClock.lock.Unlock()
+
+ active := !f.waiter.fired
+
+ f.waiter.fired = false
+ f.waiter.targetTime = f.fakeClock.time.Add(d)
+
+ var isWaiting bool
+ for i := range f.fakeClock.waiters {
+ w := f.fakeClock.waiters[i]
+ if w == &f.waiter {
+ isWaiting = true
+ break
+ }
+ }
+ if !isWaiting {
+ f.fakeClock.waiters = append(f.fakeClock.waiters, &f.waiter)
+ }
+
+ return active
+}
diff --git a/oauth2/config_tokenprovider.go b/oauth2/config_tokenprovider.go
new file mode 100644
index 0000000..627749f
--- /dev/null
+++ b/oauth2/config_tokenprovider.go
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package oauth2
+
+import "fmt"
+
+type configProvider interface {
+ GetTokens(identifier string) (string, string)
+ SaveTokens(identifier, accessToken, refreshToken string)
+}
+
+// ConfigBackedCachingProvider wraps a configProvider in order to conform to
+// the cachingProvider interface
+type ConfigBackedCachingProvider struct {
+ identifier string
+ config configProvider
+}
+
+// NewConfigBackedCachingProvider builds and returns a CachingTokenProvider
+// that utilizes a configProvider to cache tokens
+func NewConfigBackedCachingProvider(clientID, audience string, config configProvider) *ConfigBackedCachingProvider {
+ return &ConfigBackedCachingProvider{
+ identifier: fmt.Sprintf("%s-%s", clientID, audience),
+ config: config,
+ }
+}
+
+// GetTokens gets the tokens from the cache and returns them as a TokenResult
+func (c *ConfigBackedCachingProvider) GetTokens() (*TokenResult, error) {
+ accessToken, refreshToken := c.config.GetTokens(c.identifier)
+ return &TokenResult{
+ AccessToken: accessToken,
+ RefreshToken: refreshToken,
+ }, nil
+}
+
+// CacheTokens caches the id and refresh token from TokenResult in the
+// configProvider
+func (c *ConfigBackedCachingProvider) CacheTokens(toCache *TokenResult) error {
+ c.config.SaveTokens(c.identifier, toCache.AccessToken, toCache.RefreshToken)
+ return nil
+}
diff --git a/oauth2/config_tokenprovider_test.go b/oauth2/config_tokenprovider_test.go
new file mode 100644
index 0000000..d949a5a
--- /dev/null
+++ b/oauth2/config_tokenprovider_test.go
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package oauth2
+
+import (
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+)
+
+type mockConfigProvider struct {
+ ReturnAccessToken string
+ ReturnRefreshToken string
+ GetTokensCalledIdentifier string
+ SavedIdentifier string
+ SavedAccessToken string
+ SavedRefreshToken string
+}
+
+func (m *mockConfigProvider) GetTokens(identifier string) (string, string) {
+ m.GetTokensCalledIdentifier = identifier
+ return m.ReturnAccessToken, m.ReturnRefreshToken
+}
+
+func (m *mockConfigProvider) SaveTokens(identifier, accessToken, refreshToken string) {
+ m.SavedIdentifier = identifier
+ m.SavedAccessToken = accessToken
+ m.SavedRefreshToken = refreshToken
+}
+
+var _ = Describe("main", func() {
+ Describe("configCachingProvider", func() {
+ It("sets up the identifier using the clientID and audience", func() {
+ p := NewConfigBackedCachingProvider("iamclientid", "iamaudience", &mockConfigProvider{})
+
+ Expect(p.identifier).To(Equal("iamclientid-iamaudience"))
+ })
+
+ It("gets tokens from the config provider", func() {
+ c := &mockConfigProvider{
+ ReturnAccessToken: "accessToken",
+ ReturnRefreshToken: "refreshToken",
+ }
+ p := ConfigBackedCachingProvider{
+ identifier: "iamidentifier",
+ config: c,
+ }
+
+ r, err := p.GetTokens()
+
+ Expect(err).NotTo(HaveOccurred())
+ Expect(c.GetTokensCalledIdentifier).To(Equal(p.identifier))
+ Expect(r).To(Equal(&TokenResult{
+ AccessToken: c.ReturnAccessToken,
+ RefreshToken: c.ReturnRefreshToken,
+ }))
+ })
+
+ It("caches the tokens in the config provider", func() {
+ c := &mockConfigProvider{}
+ p := ConfigBackedCachingProvider{
+ identifier: "iamidentifier",
+ config: c,
+ }
+ toSave := &TokenResult{
+ AccessToken: "accessToken",
+ RefreshToken: "refreshToken",
+ }
+
+ p.CacheTokens(toSave)
+
+ Expect(c.SavedIdentifier).To(Equal(p.identifier))
+ Expect(c.SavedAccessToken).To(Equal(toSave.AccessToken))
+ Expect(c.SavedRefreshToken).To(Equal(toSave.RefreshToken))
+ })
+ })
+})
diff --git a/oauth2/device_code_flow.go b/oauth2/device_code_flow.go
new file mode 100644
index 0000000..486fdfa
--- /dev/null
+++ b/oauth2/device_code_flow.go
@@ -0,0 +1,203 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package oauth2
+
+import (
+ "context"
+ "fmt"
+ "net/http"
+ "time"
+
+ "github.com/apache/pulsar-client-go/oauth2/clock"
+
+ "github.com/pkg/errors"
+)
+
+// DeviceCodeFlow takes care of the mechanics needed for getting an access
+// token using the OAuth 2.0 "Device Code Flow"
+type DeviceCodeFlow struct {
+ options DeviceCodeFlowOptions
+ oidcWellKnownEndpoints OIDCWellKnownEndpoints
+ codeProvider DeviceCodeProvider
+ exchanger DeviceTokenExchanger
+ callback DeviceCodeCallback
+ clock clock.Clock
+}
+
+// AuthorizationCodeProvider abstracts getting an authorization code
+type DeviceCodeProvider interface {
+ GetCode(audience string, additionalScopes ...string) (*DeviceCodeResult, error)
+}
+
+// DeviceTokenExchanger abstracts exchanging for tokens
+type DeviceTokenExchanger interface {
+ ExchangeDeviceCode(ctx context.Context, req DeviceCodeExchangeRequest) (*TokenResult, error)
+ ExchangeRefreshToken(req RefreshTokenExchangeRequest) (*TokenResult, error)
+}
+
+type DeviceCodeCallback func(code *DeviceCodeResult) error
+
+type DeviceCodeFlowOptions struct {
+ IssuerEndpoint string
+ ClientID string
+ AdditionalScopes []string
+ AllowRefresh bool
+}
+
+func newDeviceCodeFlow(
+ options DeviceCodeFlowOptions,
+ oidcWellKnownEndpoints OIDCWellKnownEndpoints,
+ codeProvider DeviceCodeProvider,
+ exchanger DeviceTokenExchanger,
+ callback DeviceCodeCallback,
+ clock clock.Clock) *DeviceCodeFlow {
+ return &DeviceCodeFlow{
+ options: options,
+ oidcWellKnownEndpoints: oidcWellKnownEndpoints,
+ codeProvider: codeProvider,
+ exchanger: exchanger,
+ callback: callback,
+ clock: clock,
+ }
+}
+
+// NewDefaultDeviceCodeFlow provides an easy way to build up a default
+// device code flow with all the correct configuration. If refresh tokens should
+// be allowed pass in true for <allowRefresh>
+func NewDefaultDeviceCodeFlow(options DeviceCodeFlowOptions,
+ callback DeviceCodeCallback) (*DeviceCodeFlow, error) {
+ wellKnownEndpoints, err := GetOIDCWellKnownEndpointsFromIssuerURL(options.IssuerEndpoint)
+ if err != nil {
+ return nil, err
+ }
+
+ codeProvider := NewLocalDeviceCodeProvider(
+ LocalDeviceCodeProviderOptions{
+ ClientID: options.ClientID,
+ },
+ *wellKnownEndpoints,
+ &http.Client{},
+ )
+
+ tokenRetriever := NewTokenRetriever(&http.Client{})
+
+ return newDeviceCodeFlow(
+ options,
+ *wellKnownEndpoints,
+ codeProvider,
+ tokenRetriever,
+ callback,
+ clock.RealClock{}), nil
+}
+
+var _ Flow = &DeviceCodeFlow{}
+
+func (p *DeviceCodeFlow) Authorize(audience string) (*AuthorizationGrant, error) {
+
+ var additionalScopes []string
+ additionalScopes = append(additionalScopes, p.options.AdditionalScopes...)
+ if p.options.AllowRefresh {
+ additionalScopes = append(additionalScopes, "offline_access")
+ }
+
+ codeResult, err := p.codeProvider.GetCode(audience, additionalScopes...)
+ if err != nil {
+ return nil, err
+ }
+
+ if p.callback != nil {
+ err := p.callback(codeResult)
+ if err != nil {
+ return nil, err
+ }
+ }
+
+ exchangeRequest := DeviceCodeExchangeRequest{
+ TokenEndpoint: p.oidcWellKnownEndpoints.TokenEndpoint,
+ ClientID: p.options.ClientID,
+ DeviceCode: codeResult.DeviceCode,
+ PollInterval: time.Duration(codeResult.Interval) * time.Second,
+ }
+
+ tr, err := p.exchanger.ExchangeDeviceCode(context.Background(), exchangeRequest)
+ if err != nil {
+ return nil, errors.Wrap(err, "could not exchange code")
+ }
+
+ token := convertToOAuth2Token(tr, p.clock)
+ grant := &AuthorizationGrant{
+ Type: GrantTypeDeviceCode,
+ Audience: audience,
+ ClientID: p.options.ClientID,
+ TokenEndpoint: p.oidcWellKnownEndpoints.TokenEndpoint,
+ Token: &token,
+ }
+ return grant, nil
+}
+
+type DeviceAuthorizationGrantRefresher struct {
+ exchanger DeviceTokenExchanger
+ clock clock.Clock
+}
+
+// NewDefaultDeviceAuthorizationGrantRefresher constructs a grant refresher based on the result
+// of the device authorization flow.
+func NewDefaultDeviceAuthorizationGrantRefresher(clock clock.Clock) (*DeviceAuthorizationGrantRefresher, error) {
+ tokenRetriever := NewTokenRetriever(&http.Client{})
+ return &DeviceAuthorizationGrantRefresher{
+ exchanger: tokenRetriever,
+ clock: clock,
+ }, nil
+}
+
+var _ AuthorizationGrantRefresher = &DeviceAuthorizationGrantRefresher{}
+
+func (g *DeviceAuthorizationGrantRefresher) Refresh(grant *AuthorizationGrant) (*AuthorizationGrant, error) {
+ if grant.Type != GrantTypeDeviceCode {
+ return nil, errors.New("unsupported grant type")
+ }
+ if grant.Token == nil || grant.Token.RefreshToken == "" {
+ return nil, fmt.Errorf("the authorization grant has expired (no refresh token); please re-login")
+ }
+
+ exchangeRequest := RefreshTokenExchangeRequest{
+ TokenEndpoint: grant.TokenEndpoint,
+ ClientID: grant.ClientID,
+ RefreshToken: grant.Token.RefreshToken,
+ }
+ tr, err := g.exchanger.ExchangeRefreshToken(exchangeRequest)
+ if err != nil {
+ return nil, errors.Wrap(err, "could not exchange refresh token")
+ }
+
+ // RFC 6749 Section 1.5 - token exchange MAY issue a new refresh token (otherwise the result is blank).
+ // also see: https://tools.ietf.org/html/draft-ietf-oauth-security-topics-13#section-4.12
+ if tr.RefreshToken == "" {
+ tr.RefreshToken = grant.Token.RefreshToken
+ }
+
+ token := convertToOAuth2Token(tr, g.clock)
+ grant = &AuthorizationGrant{
+ Type: GrantTypeDeviceCode,
+ Audience: grant.Audience,
+ ClientID: grant.ClientID,
+ Token: &token,
+ TokenEndpoint: grant.TokenEndpoint,
+ }
+ return grant, nil
+}
diff --git a/oauth2/device_code_flow_test.go b/oauth2/device_code_flow_test.go
new file mode 100644
index 0000000..a238a48
--- /dev/null
+++ b/oauth2/device_code_flow_test.go
@@ -0,0 +1,230 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package oauth2
+
+import (
+ "errors"
+ "time"
+
+ "github.com/apache/pulsar-client-go/oauth2/clock"
+ "github.com/apache/pulsar-client-go/oauth2/clock/testing"
+ "golang.org/x/oauth2"
+
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+)
+
+type MockDeviceCodeProvider struct {
+ Called bool
+ CalledWithAudience string
+ CalledWithAdditionalScopes []string
+ DeviceCodeResult *DeviceCodeResult
+ ReturnsError error
+}
+
+func (cp *MockDeviceCodeProvider) GetCode(audience string, additionalScopes ...string) (*DeviceCodeResult, error) {
+ cp.Called = true
+ cp.CalledWithAudience = audience
+ cp.CalledWithAdditionalScopes = additionalScopes
+ return cp.DeviceCodeResult, cp.ReturnsError
+}
+
+type MockDeviceCodeCallback struct {
+ Called bool
+ DeviceCodeResult *DeviceCodeResult
+ ReturnsError error
+}
+
+func (c *MockDeviceCodeCallback) Callback(code *DeviceCodeResult) error {
+ c.Called = true
+ c.DeviceCodeResult = code
+ if c.ReturnsError != nil {
+ return c.ReturnsError
+ }
+ return nil
+}
+
+var _ = Describe("DeviceCodeFlow", func() {
+
+ Describe("Authorize", func() {
+ const audience = "test_clientID"
+
+ var mockClock clock.Clock
+ var mockCodeProvider *MockDeviceCodeProvider
+ var mockTokenExchanger *MockTokenExchanger
+ var mockCallback *MockDeviceCodeCallback
+ var flow *DeviceCodeFlow
+
+ BeforeEach(func() {
+ mockClock = testing.NewFakeClock(time.Unix(0, 0))
+
+ mockCodeProvider = &MockDeviceCodeProvider{
+ DeviceCodeResult: &DeviceCodeResult{
+ DeviceCode: "test_deviceCode",
+ UserCode: "test_userCode",
+ VerificationURI: "http://verification_uri",
+ VerificationURIComplete: "http://verification_uri_complete",
+ ExpiresIn: 10,
+ Interval: 5,
+ },
+ }
+
+ expectedTokens := TokenResult{AccessToken: "accessToken", RefreshToken: "refreshToken", ExpiresIn: 1234}
+ mockTokenExchanger = &MockTokenExchanger{
+ ReturnsTokens: &expectedTokens,
+ }
+
+ mockCallback = &MockDeviceCodeCallback{}
+
+ opts := DeviceCodeFlowOptions{
+ IssuerEndpoint: "http://issuer",
+ ClientID: "test_clientID",
+ AdditionalScopes: nil,
+ AllowRefresh: true,
+ }
+ flow = newDeviceCodeFlow(
+ opts,
+ oidcEndpoints,
+ mockCodeProvider,
+ mockTokenExchanger,
+ mockCallback.Callback,
+ mockClock,
+ )
+ })
+
+ It("invokes DeviceCodeProvider", func() {
+ _, _ = flow.Authorize(audience)
+ Expect(mockCodeProvider.Called).To(BeTrue())
+ Expect(mockCodeProvider.CalledWithAdditionalScopes).To(ContainElement("offline_access"))
+ })
+
+ It("invokes callback with returned code", func() {
+ _, _ = flow.Authorize(audience)
+ Expect(mockCallback.Called).To(BeTrue())
+ Expect(mockCallback.DeviceCodeResult).To(Equal(mockCodeProvider.DeviceCodeResult))
+ })
+
+ It("invokes TokenExchanger with returned code", func() {
+ _, _ = flow.Authorize(audience)
+ Expect(mockTokenExchanger.CalledWithRequest).To(Equal(&DeviceCodeExchangeRequest{
+ TokenEndpoint: oidcEndpoints.TokenEndpoint,
+ ClientID: "test_clientID",
+ PollInterval: time.Duration(5) * time.Second,
+ DeviceCode: "test_deviceCode",
+ }))
+ })
+
+ It("returns an authorization grant", func() {
+ grant, _ := flow.Authorize(audience)
+ Expect(grant).ToNot(BeNil())
+ Expect(grant.Audience).To(Equal(audience))
+ Expect(grant.ClientID).To(Equal("test_clientID"))
+ Expect(grant.ClientCredentials).To(BeNil())
+ Expect(grant.TokenEndpoint).To(Equal(oidcEndpoints.TokenEndpoint))
+ expected := convertToOAuth2Token(mockTokenExchanger.ReturnsTokens, mockClock)
+ Expect(*grant.Token).To(Equal(expected))
+ })
+ })
+})
+
+var _ = Describe("DeviceAuthorizationGrantRefresher", func() {
+
+ Describe("Refresh", func() {
+ var mockClock clock.Clock
+ var mockTokenExchanger *MockTokenExchanger
+ var refresher *DeviceAuthorizationGrantRefresher
+ var grant *AuthorizationGrant
+
+ BeforeEach(func() {
+ mockClock = testing.NewFakeClock(time.Unix(0, 0))
+
+ mockTokenExchanger = &MockTokenExchanger{}
+
+ refresher = &DeviceAuthorizationGrantRefresher{
+ exchanger: mockTokenExchanger,
+ clock: mockClock,
+ }
+
+ token := oauth2.Token{AccessToken: "gat", RefreshToken: "grt", Expiry: time.Unix(1, 0)}
+ grant = &AuthorizationGrant{
+ Type: GrantTypeDeviceCode,
+ ClientID: "test_clientID",
+ TokenEndpoint: oidcEndpoints.TokenEndpoint,
+ Token: &token,
+ }
+ })
+
+ It("invokes the token exchanger", func() {
+ mockTokenExchanger.ReturnsTokens = &TokenResult{
+ AccessToken: "new token",
+ }
+
+ _, _ = refresher.Refresh(grant)
+ Expect(*mockTokenExchanger.RefreshCalledWithRequest).To(Equal(RefreshTokenExchangeRequest{
+ TokenEndpoint: oidcEndpoints.TokenEndpoint,
+ ClientID: grant.ClientID,
+ RefreshToken: "grt",
+ }))
+ })
+
+ It("returns the refreshed access token from the TokenExchanger", func() {
+ mockTokenExchanger.ReturnsTokens = &TokenResult{
+ AccessToken: "new token",
+ }
+
+ grant, _ = refresher.Refresh(grant)
+ Expect(grant.Token.AccessToken).To(Equal(mockTokenExchanger.ReturnsTokens.AccessToken))
+ })
+
+ It("preserves the existing refresh token from the TokenExchanger", func() {
+ mockTokenExchanger.ReturnsTokens = &TokenResult{
+ AccessToken: "new token",
+ }
+
+ grant, _ = refresher.Refresh(grant)
+ Expect(grant.Token.RefreshToken).To(Equal("grt"))
+ })
+
+ It("returns the refreshed refresh token from the TokenExchanger", func() {
+ mockTokenExchanger.ReturnsTokens = &TokenResult{
+ AccessToken: "new token",
+ RefreshToken: "new token",
+ }
+
+ grant, _ = refresher.Refresh(grant)
+ Expect(grant.Token.RefreshToken).To(Equal("new token"))
+ })
+
+ It("returns a meaningful expiration time", func() {
+ mockTokenExchanger.ReturnsTokens = &TokenResult{
+ AccessToken: "new token",
+ ExpiresIn: 60,
+ }
+
+ grant, _ = refresher.Refresh(grant)
+ Expect(grant.Token.Expiry).To(Equal(mockClock.Now().Add(time.Duration(60) * time.Second)))
+ })
+
+ It("returns an error when TokenExchanger does", func() {
+ mockTokenExchanger.ReturnsError = errors.New("someerror")
+
+ _, err := refresher.Refresh(grant)
+ Expect(err.Error()).To(Equal("could not exchange refresh token: someerror"))
+ })
+ })
+})
diff --git a/oauth2/device_code_provider.go b/oauth2/device_code_provider.go
new file mode 100644
index 0000000..23b226b
--- /dev/null
+++ b/oauth2/device_code_provider.go
@@ -0,0 +1,133 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package oauth2
+
+import (
+ "encoding/json"
+ "fmt"
+ "net/http"
+ "net/url"
+ "strconv"
+ "strings"
+)
+
+// DeviceCodeProvider holds the information needed to easily get a
+// device code locally.
+type LocalDeviceCodeProvider struct {
+ options LocalDeviceCodeProviderOptions
+ oidcWellKnownEndpoints OIDCWellKnownEndpoints
+ transport HTTPAuthTransport
+}
+
+type DeviceCodeRequest struct {
+ ClientID string
+ Scopes []string
+ Audience string
+}
+
+// DeviceCodeResult holds the device code gotten from the device code URL.
+type DeviceCodeResult struct {
+ DeviceCode string `json:"device_code"`
+ UserCode string `json:"user_code"`
+ VerificationURI string `json:"verification_uri"`
+ VerificationURIComplete string `json:"verification_uri_complete"`
+ ExpiresIn int `json:"expires_in"`
+ Interval int `json:"interval"`
+}
+
+type LocalDeviceCodeProviderOptions struct {
+ ClientID string
+}
+
+// NewLocalDeviceCodeProvider allows for the easy setup of LocalDeviceCodeProvider
+func NewLocalDeviceCodeProvider(
+ options LocalDeviceCodeProviderOptions,
+ oidcWellKnownEndpoints OIDCWellKnownEndpoints,
+ authTransport HTTPAuthTransport) *LocalDeviceCodeProvider {
+ return &LocalDeviceCodeProvider{
+ options,
+ oidcWellKnownEndpoints,
+ authTransport,
+ }
+}
+
+// GetCode obtains a new device code. Additional scopes
+// beyond openid and email can be sent by passing in arguments for
+// <additionalScopes>.
+func (cp *LocalDeviceCodeProvider) GetCode(audience string, additionalScopes ...string) (*DeviceCodeResult, error) {
+ request, err := cp.newDeviceCodeRequest(&DeviceCodeRequest{
+ ClientID: cp.options.ClientID,
+ Scopes: append([]string{"openid", "email"}, additionalScopes...),
+ Audience: audience,
+ })
+ if err != nil {
+ return nil, err
+ }
+
+ response, err := cp.transport.Do(request)
+ if err != nil {
+ return nil, err
+ }
+
+ dcr, err := cp.handleDeviceCodeResponse(response)
+ if err != nil {
+ return nil, err
+ }
+
+ return dcr, nil
+}
+
+// newDeviceCodeRequest builds a new DeviceCodeRequest wrapped in an
+// http.Request
+func (cp *LocalDeviceCodeProvider) newDeviceCodeRequest(
+ req *DeviceCodeRequest) (*http.Request, error) {
+ uv := url.Values{}
+ uv.Set("client_id", req.ClientID)
+ uv.Set("scope", strings.Join(req.Scopes, " "))
+ uv.Set("audience", req.Audience)
+ euv := uv.Encode()
+
+ request, err := http.NewRequest("POST",
+ cp.oidcWellKnownEndpoints.DeviceAuthorizationEndpoint,
+ strings.NewReader(euv),
+ )
+ if err != nil {
+ return nil, err
+ }
+
+ request.Header.Add("Content-Type", "application/x-www-form-urlencoded")
+ request.Header.Add("Content-Length", strconv.Itoa(len(euv)))
+
+ return request, nil
+}
+
+func (cp *LocalDeviceCodeProvider) handleDeviceCodeResponse(resp *http.Response) (*DeviceCodeResult, error) {
+ if resp.StatusCode < 200 || resp.StatusCode >= 300 {
+ return nil, fmt.Errorf("a non-success status code was received: %d", resp.StatusCode)
+ }
+
+ defer resp.Body.Close()
+
+ dcr := DeviceCodeResult{}
+ err := json.NewDecoder(resp.Body).Decode(&dcr)
+ if err != nil {
+ return nil, err
+ }
+
+ return &dcr, nil
+}
diff --git a/oauth2/go.mod b/oauth2/go.mod
new file mode 100644
index 0000000..d185ef0
--- /dev/null
+++ b/oauth2/go.mod
@@ -0,0 +1,12 @@
+module github.com/apache/pulsar-client-go/oauth2
+
+go 1.13
+
+require (
+ github.com/99designs/keyring v1.1.5
+ github.com/dgrijalva/jwt-go v3.2.0+incompatible
+ github.com/onsi/ginkgo v1.14.0
+ github.com/onsi/gomega v1.10.1
+ github.com/pkg/errors v0.9.1
+ golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
+)
diff --git a/oauth2/go.sum b/oauth2/go.sum
new file mode 100644
index 0000000..5063d2f
--- /dev/null
+++ b/oauth2/go.sum
@@ -0,0 +1,113 @@
+cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
+github.com/99designs/keyring v1.1.5 h1:wLv7QyzYpFIyMSwOADq1CLTF9KbjbBfcnfmOGJ64aO4=
+github.com/99designs/keyring v1.1.5/go.mod h1:7hsVvt2qXgtadGevGJ4ujg+u8m6SpJ5TpHqTozIPqf0=
+github.com/danieljoos/wincred v1.0.2 h1:zf4bhty2iLuwgjgpraD2E9UbvO+fe54XXGJbOwe23fU=
+github.com/danieljoos/wincred v1.0.2/go.mod h1:SnuYRW9lp1oJrZX/dXJqr0cPK5gYXqx3EJbmjhLdK9U=
+github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
+github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/dgrijalva/jwt-go v3.2.0+incompatible h1:7qlOGliEKZXTDg6OTjfoBKDXWrumCAMpl/TFQ4/5kLM=
+github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
+github.com/dvsekhvalnov/jose2go v0.0.0-20180829124132-7f401d37b68a h1:mq+R6XEM6lJX5VlLyZIrUSP8tSuJp82xTK89hvBwJbU=
+github.com/dvsekhvalnov/jose2go v0.0.0-20180829124132-7f401d37b68a/go.mod h1:7BvyPhdbLxMXIYTFPLsyJRFMsKmOZnQmzh6Gb+uquuM=
+github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
+github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
+github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
+github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas=
+github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2 h1:ZpnhV/YsD2/4cESfV5+Hoeu/iUR3ruzNvZ+yQfO03a0=
+github.com/godbus/dbus v0.0.0-20190726142602-4481cbc300e2/go.mod h1:bBOAhwG1umN6/6ZUMtDFBMQR8jRg9O75tm9K00oMsK4=
+github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
+github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
+github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
+github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
+github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
+github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
+github.com/golang/protobuf v1.4.2 h1:+Z5KGCizgyZCbGh1KZqA0fcLLkwbsjIzS4aV2v7wJX0=
+github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
+github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
+github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
+github.com/google/go-cmp v0.4.0 h1:xsAVV57WRhGj6kEIi8ReJzQlHHqcBYCElAvkovg3B/4=
+github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
+github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c h1:6rhixN/i8ZofjG1Y75iExal34USq5p+wiN1tpie8IrU=
+github.com/gsterjov/go-libsecret v0.0.0-20161001094733-a6f4afe4910c/go.mod h1:NMPJylDgVpX0MLRlPy15sqSwOFv/U1GZ2m21JhFfek0=
+github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
+github.com/keybase/go-keychain v0.0.0-20190712205309-48d3d31d256d h1:Z+RDyXzjKE0i2sTjZ/b1uxiGtPhFy34Ou/Tk0qwN0kM=
+github.com/keybase/go-keychain v0.0.0-20190712205309-48d3d31d256d/go.mod h1:JJNrCn9otv/2QP4D7SMJBgaleKpOf66PnW6F5WGNRIc=
+github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI=
+github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo=
+github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
+github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE=
+github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
+github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y=
+github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
+github.com/mtibben/percent v0.2.1 h1:5gssi8Nqo8QU/r2pynCm+hBQHpkB/uNK7BJCFogWdzs=
+github.com/mtibben/percent v0.2.1/go.mod h1:KG9uO+SZkUp+VkRHsCdYQV3XSZrrSpR3O9ibNBTZrns=
+github.com/nxadm/tail v1.4.4 h1:DQuhQpB1tVlglWS2hLQ5OV6B5r8aGxSrPc5Qo6uTN78=
+github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
+github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
+github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk=
+github.com/onsi/ginkgo v1.14.0 h1:2mOpI4JVVPBN+WQRa0WKH2eXR+Ey+uK4n7Zj0aYpIQA=
+github.com/onsi/ginkgo v1.14.0/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY=
+github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
+github.com/onsi/gomega v1.10.1 h1:o0+MgICZLuZ7xjH7Vx6zS/zcu93/BEp1VwkIW1mEXCE=
+github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
+github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
+github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
+github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
+github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
+github.com/spf13/afero v1.2.2/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk=
+github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/objx v0.2.0 h1:Hbg2NidpLE8veEBkEZTL3CvlkUIVzuU9jDplZO54c48=
+github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
+github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
+github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
+golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 h1:HuIa8hRrWRSrqYzx1qI49NNxhdi2PrY7gxVSq1JjLDc=
+golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
+golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
+golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7 h1:AeiKBIuRw3UomYXSbLy0Mc2dDLfdtbT/IVn4keq83P0=
+golang.org/x/net v0.0.0-20200520004742-59133d7f0dd7/go.mod h1:qpuaurCH72eLCgpAm/N6yyVIVM9cpaDIP3A8BGJEC5A=
+golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d h1:TzXSXBo42m9gQenoE3b9BGiEpg5IG2JkU5FkPIawgtw=
+golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
+golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190712062909-fae7ac547cb7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190904154756-749cb33beabd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20191120155948-bd437916bb0e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20200519105757-fe76b779f299 h1:DYfZAGf2WMFjMxbgTjaC+2HC7NkNAQs+6Q8b9WEB/F4=
+golang.org/x/sys v0.0.0-20200519105757-fe76b779f299/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs=
+golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
+golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543 h1:E7g+9GITq07hpfrRu66IVDexMakfv52eLZ2CXBWiKr4=
+golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+google.golang.org/appengine v1.4.0 h1:/wp5JvzpHIxhs/dumFmF7BXTf3Z+dd4uXta4kVyO508=
+google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
+google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
+google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
+google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
+google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
+google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
+google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM=
+google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
+gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
+gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
+gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
+gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU=
+gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE=
+k8s.io/utils v0.0.0-20200619165400-6e3d28b6ed19 h1:7Nu2dTj82c6IaWvL7hImJzcXoTPz1MsSCH7r+0m6rfo=
+k8s.io/utils v0.0.0-20200619165400-6e3d28b6ed19/go.mod h1:jPW/WVKK9YHAvNhRxK0md/EJ228hCsBRufyofKtW8HA=
diff --git a/oauth2/oidc_endpoint_provider.go b/oauth2/oidc_endpoint_provider.go
new file mode 100644
index 0000000..32986b7
--- /dev/null
+++ b/oauth2/oidc_endpoint_provider.go
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package oauth2
+
+import (
+ "encoding/json"
+ "net/http"
+ "net/url"
+ "path"
+
+ "github.com/pkg/errors"
+)
+
+// OIDCWellKnownEndpoints holds the well known OIDC endpoints
+type OIDCWellKnownEndpoints struct {
+ AuthorizationEndpoint string `json:"authorization_endpoint"`
+ TokenEndpoint string `json:"token_endpoint"`
+ DeviceAuthorizationEndpoint string `json:"device_authorization_endpoint"`
+}
+
+// GetOIDCWellKnownEndpointsFromIssuerURL gets the well known endpoints for the
+// passed in issuer url
+func GetOIDCWellKnownEndpointsFromIssuerURL(issuerURL string) (*OIDCWellKnownEndpoints, error) {
+ u, err := url.Parse(issuerURL)
+ if err != nil {
+ return nil, errors.Wrap(err, "could not parse issuer url to build well known endpoints")
+ }
+ u.Path = path.Join(u.Path, ".well-known/openid-configuration")
+
+ r, err := http.Get(u.String())
+ if err != nil {
+ return nil, errors.Wrapf(err, "could not get well known endpoints from url %s", u.String())
+ }
+ defer r.Body.Close()
+
+ var wkEndpoints OIDCWellKnownEndpoints
+ err = json.NewDecoder(r.Body).Decode(&wkEndpoints)
+ if err != nil {
+ return nil, errors.Wrap(err, "could not decode json body when getting well known endpoints")
+ }
+
+ return &wkEndpoints, nil
+}
diff --git a/oauth2/oidc_endpoint_provider_test.go b/oauth2/oidc_endpoint_provider_test.go
new file mode 100644
index 0000000..4ebce3b
--- /dev/null
+++ b/oauth2/oidc_endpoint_provider_test.go
@@ -0,0 +1,92 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package oauth2
+
+import (
+ "encoding/json"
+ "net/http"
+ "net/http/httptest"
+
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+)
+
+var _ = Describe("GetOIDCWellKnownEndpointsFromIssuerURL", func() {
+ It("calls and gets the well known data from the correct endpoint for the issuer", func() {
+ var req *http.Request
+ wkEndpointsResp := OIDCWellKnownEndpoints{
+ AuthorizationEndpoint: "the-auth-endpoint", TokenEndpoint: "the-token-endpoint"}
+ responseBytes, err := json.Marshal(wkEndpointsResp)
+ Expect(err).ToNot(HaveOccurred())
+
+ ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ req = r
+
+ w.WriteHeader(http.StatusOK)
+ w.Write(responseBytes)
+
+ }))
+ defer ts.Close()
+
+ endpoints, err := GetOIDCWellKnownEndpointsFromIssuerURL(ts.URL)
+
+ Expect(err).ToNot(HaveOccurred())
+ Expect(*endpoints).To(Equal(wkEndpointsResp))
+ Expect(req.URL.Path).To(Equal("/.well-known/openid-configuration"))
+ })
+
+ It("errors when url.Parse errors", func() {
+ endpoints, err := GetOIDCWellKnownEndpointsFromIssuerURL("://")
+
+ Expect(err).To(HaveOccurred())
+ Expect(err.Error()).To(Equal(
+ "could not parse issuer url to build well known endpoints: parse ://: missing protocol scheme"))
+ Expect(endpoints).To(BeNil())
+ })
+
+ It("errors when the get errors", func() {
+ endpoints, err := GetOIDCWellKnownEndpointsFromIssuerURL("https://")
+
+ Expect(err).To(HaveOccurred())
+ Expect(err.Error()).To(Equal(
+ "could not get well known endpoints from url https://.well-known/openid-configuration: " +
+ "Get https://.well-known/openid-configuration: dial tcp: lookup .well-known: no such host"))
+ Expect(endpoints).To(BeNil())
+ })
+
+ It("errors when the json decoder errors", func() {
+ var req *http.Request
+
+ ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
+ req = r
+
+ w.WriteHeader(http.StatusOK)
+ w.Write([]byte("<"))
+
+ }))
+ defer ts.Close()
+
+ endpoints, err := GetOIDCWellKnownEndpointsFromIssuerURL(ts.URL)
+
+ Expect(err).To(HaveOccurred())
+ Expect(err.Error()).To(Equal("could not decode json body when getting well" +
+ " known endpoints: invalid character '<' looking for beginning of value"))
+ Expect(endpoints).To(BeNil())
+ Expect(req.URL.Path).To(Equal("/.well-known/openid-configuration"))
+ })
+})
diff --git a/oauth2/store/keyring.go b/oauth2/store/keyring.go
new file mode 100644
index 0000000..fd49baf
--- /dev/null
+++ b/oauth2/store/keyring.go
@@ -0,0 +1,194 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package store
+
+import (
+ "crypto/sha1"
+ "encoding/json"
+ "fmt"
+ "sync"
+
+ "github.com/99designs/keyring"
+ "github.com/apache/pulsar-client-go/oauth2"
+ "github.com/apache/pulsar-client-go/oauth2/clock"
+)
+
+type KeyringStore struct {
+ kr keyring.Keyring
+ clock clock.Clock
+ lock sync.Mutex
+}
+
+// storedItem represents an item stored in the keyring
+type storedItem struct {
+ Audience string
+ UserName string
+ Grant oauth2.AuthorizationGrant
+}
+
+// NewKeyringStore creates a store based on a keyring.
+func NewKeyringStore(kr keyring.Keyring) (*KeyringStore, error) {
+ return &KeyringStore{
+ kr: kr,
+ clock: clock.RealClock{},
+ }, nil
+}
+
+var _ Store = &KeyringStore{}
+
+func (f *KeyringStore) SaveGrant(audience string, grant oauth2.AuthorizationGrant) error {
+ f.lock.Lock()
+ defer f.lock.Unlock()
+
+ var err error
+ var userName string
+ switch grant.Type {
+ case oauth2.GrantTypeClientCredentials:
+ if grant.ClientCredentials == nil {
+ return ErrUnsupportedAuthData
+ }
+ userName = grant.ClientCredentials.ClientEmail
+ case oauth2.GrantTypeDeviceCode:
+ if grant.Token == nil {
+ return ErrUnsupportedAuthData
+ }
+ userName, err = oauth2.ExtractUserName(*grant.Token)
+ if err != nil {
+ return err
+ }
+ default:
+ return ErrUnsupportedAuthData
+ }
+ item := storedItem{
+ Audience: audience,
+ UserName: userName,
+ Grant: grant,
+ }
+ err = f.setItem(item)
+ if err != nil {
+ return err
+ }
+ return nil
+}
+
+func (f *KeyringStore) LoadGrant(audience string) (*oauth2.AuthorizationGrant, error) {
+ f.lock.Lock()
+ defer f.lock.Unlock()
+
+ item, err := f.getItem(audience)
+ if err != nil {
+ if err == keyring.ErrKeyNotFound {
+ return nil, ErrNoAuthenticationData
+ }
+ return nil, err
+ }
+ switch item.Grant.Type {
+ case oauth2.GrantTypeClientCredentials:
+ if item.Grant.ClientCredentials == nil {
+ return nil, ErrUnsupportedAuthData
+ }
+ case oauth2.GrantTypeDeviceCode:
+ if item.Grant.Token == nil {
+ return nil, ErrUnsupportedAuthData
+ }
+ default:
+ return nil, ErrUnsupportedAuthData
+ }
+ return &item.Grant, nil
+}
+
+func (f *KeyringStore) WhoAmI(audience string) (string, error) {
+ f.lock.Lock()
+ defer f.lock.Unlock()
+
+ key := hashKeyringKey(audience)
+ authItem, err := f.kr.GetMetadata(key)
+ if err != nil {
+ if err == keyring.ErrKeyNotFound {
+ return "", ErrNoAuthenticationData
+ }
+ return "", fmt.Errorf("unable to get information from the keyring: %v", err)
+ }
+ return authItem.Label, nil
+}
+
+func (f *KeyringStore) Logout() error {
+ f.lock.Lock()
+ defer f.lock.Unlock()
+
+ var err error
+ keys, err := f.kr.Keys()
+ if err != nil {
+ return fmt.Errorf("unable to get information from the keyring: %v", err)
+ }
+ for _, key := range keys {
+ err = f.kr.Remove(key)
+ }
+ if err != nil {
+ return fmt.Errorf("unable to update the keyring: %v", err)
+ }
+ return nil
+}
+
+func (f *KeyringStore) getItem(audience string) (storedItem, error) {
+ key := hashKeyringKey(audience)
+ i, err := f.kr.Get(key)
+ if err != nil {
+ return storedItem{}, err
+ }
+ var grant oauth2.AuthorizationGrant
+ err = json.Unmarshal(i.Data, &grant)
+ if err != nil {
+ // the grant appears to be invalid
+ return storedItem{}, ErrUnsupportedAuthData
+ }
+ return storedItem{
+ Audience: audience,
+ UserName: i.Label,
+ Grant: grant,
+ }, nil
+}
+
+func (f *KeyringStore) setItem(item storedItem) error {
+ key := hashKeyringKey(item.Audience)
+ data, err := json.Marshal(item.Grant)
+ if err != nil {
+ return err
+ }
+ i := keyring.Item{
+ Key: key,
+ Data: data,
+ Label: item.UserName,
+ Description: "authorization grant",
+ KeychainNotTrustApplication: false,
+ KeychainNotSynchronizable: false,
+ }
+ err = f.kr.Set(i)
+ if err != nil {
+ return fmt.Errorf("unable to update the keyring: %v", err)
+ }
+ return nil
+}
+
+// hashKeyringKey creates a safe key based on the given string
+func hashKeyringKey(s string) string {
+ h := sha1.New()
+ h.Write([]byte(s))
+ bs := h.Sum(nil)
+ return fmt.Sprintf("%x", bs)
+}
diff --git a/oauth2/store/memory.go b/oauth2/store/memory.go
new file mode 100644
index 0000000..07c7594
--- /dev/null
+++ b/oauth2/store/memory.go
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package store
+
+import (
+ "sync"
+
+ "github.com/apache/pulsar-client-go/oauth2"
+ "github.com/apache/pulsar-client-go/oauth2/clock"
+)
+
+type MemoryStore struct {
+ clock clock.Clock
+ lock sync.Mutex
+ grants map[string]*oauth2.AuthorizationGrant
+}
+
+func NewMemoryStore() Store {
+ return &MemoryStore{
+ clock: clock.RealClock{},
+ grants: make(map[string]*oauth2.AuthorizationGrant),
+ }
+}
+
+var _ Store = &MemoryStore{}
+
+func (f *MemoryStore) SaveGrant(audience string, grant oauth2.AuthorizationGrant) error {
+ f.lock.Lock()
+ defer f.lock.Unlock()
+ f.grants[audience] = &grant
+ return nil
+}
+
+func (f *MemoryStore) LoadGrant(audience string) (*oauth2.AuthorizationGrant, error) {
+ f.lock.Lock()
+ defer f.lock.Unlock()
+ grant, ok := f.grants[audience]
+ if !ok {
+ return nil, ErrNoAuthenticationData
+ }
+ return grant, nil
+}
+
+func (f *MemoryStore) WhoAmI(audience string) (string, error) {
+ f.lock.Lock()
+ defer f.lock.Unlock()
+ grant, ok := f.grants[audience]
+ if !ok {
+ return "", ErrNoAuthenticationData
+ }
+ switch grant.Type {
+ case oauth2.GrantTypeClientCredentials:
+ if grant.ClientCredentials == nil {
+ return "", ErrUnsupportedAuthData
+ }
+ return grant.ClientCredentials.ClientEmail, nil
+ case oauth2.GrantTypeDeviceCode:
+ if grant.Token == nil {
+ return "", ErrUnsupportedAuthData
+ }
+ return oauth2.ExtractUserName(*grant.Token)
+ default:
+ return "", ErrUnsupportedAuthData
+ }
+}
+
+func (f *MemoryStore) Logout() error {
+ f.lock.Lock()
+ defer f.lock.Unlock()
+ f.grants = map[string]*oauth2.AuthorizationGrant{}
+ return nil
+}
diff --git a/oauth2/store/store.go b/oauth2/store/store.go
new file mode 100644
index 0000000..55d4c9e
--- /dev/null
+++ b/oauth2/store/store.go
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package store
+
+import (
+ "errors"
+
+ "github.com/apache/pulsar-client-go/oauth2"
+)
+
+// ErrNoAuthenticationData indicates that stored authentication data is not available
+var ErrNoAuthenticationData = errors.New("authentication data is not available")
+
+// ErrUnsupportedAuthData ndicates that stored authentication data is unusable
+var ErrUnsupportedAuthData = errors.New("authentication data is not usable")
+
+// Store is responsible for persisting authorization grants
+type Store interface {
+ // SaveGrant stores an authorization grant for a given audience
+ SaveGrant(audience string, grant oauth2.AuthorizationGrant) error
+
+ // LoadGrant loads an authorization grant for a given audience
+ LoadGrant(audience string) (*oauth2.AuthorizationGrant, error)
+
+ // WhoAmI returns the current user name (or an error if nobody is logged in)
+ WhoAmI(audience string) (string, error)
+
+ // Logout deletes all stored credentials
+ Logout() error
+}
diff --git a/pulsar/client.go b/pulsar/client.go
index d4af906..460b275 100644
--- a/pulsar/client.go
+++ b/pulsar/client.go
@@ -62,6 +62,11 @@ func NewAuthenticationAthenz(authParams map[string]string) Authentication {
return athenz
}
+func NewAuthenticationOAuth2(authParams map[string]string) Authentication {
+ oauth, _ := auth.NewAuthenticationOAuth2WithParams(authParams)
+ return oauth
+}
+
// Builder interface that is used to construct a Pulsar Client instance.
type ClientOptions struct {
// Configure the service URL for the Pulsar service.
diff --git a/pulsar/client_impl_test.go b/pulsar/client_impl_test.go
index 308c8e0..276e4fb 100644
--- a/pulsar/client_impl_test.go
+++ b/pulsar/client_impl_test.go
@@ -20,9 +20,13 @@ package pulsar
import (
"fmt"
"io/ioutil"
+ "net/http"
+ "net/http/httptest"
+ "os"
"testing"
"time"
+ "github.com/apache/pulsar-client-go/pulsar/internal/auth"
"github.com/stretchr/testify/assert"
)
@@ -219,6 +223,96 @@ func TestTokenAuthFromFile(t *testing.T) {
client.Close()
}
+// mockOAuthServer will mock a oauth service for the tests
+func mockOAuthServer() *httptest.Server {
+ // prepare a port for the mocked server
+ server := httptest.NewUnstartedServer(http.DefaultServeMux)
+
+ // mock the used REST path for the tests
+ mockedHandler := http.NewServeMux()
+ mockedHandler.HandleFunc("/.well-known/openid-configuration", func(writer http.ResponseWriter, request *http.Request) {
+ s := fmt.Sprintf(`{
+ "issuer":"%s",
+ "authorization_endpoint":"%s/authorize",
+ "token_endpoint":"%s/oauth/token",
+ "device_authorization_endpoint":"%s/oauth/device/code"
+}`, server.URL, server.URL, server.URL, server.URL)
+ fmt.Fprintln(writer, s)
+ })
+ mockedHandler.HandleFunc("/oauth/token", func(writer http.ResponseWriter, request *http.Request) {
+ fmt.Fprintln(writer, "{\n"+
+ " \"access_token\": \"eyJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ0b2tlbi1wcmluY2lwYWwifQ."+
+ "tSfgR8l7dKC6LoWCxQgNkuSB8our7xV_nAM7wpgCbG4\",\n"+
+ " \"token_type\": \"Bearer\"\n}")
+ })
+ mockedHandler.HandleFunc("/authorize", func(writer http.ResponseWriter, request *http.Request) {
+ fmt.Fprintln(writer, "true")
+ })
+
+ server.Config.Handler = mockedHandler
+ server.Start()
+
+ return server
+}
+
+// mockKeyFile will mock a temp key file for testing.
+func mockKeyFile(server string) (string, error) {
+ pwd, err := os.Getwd()
+ if err != nil {
+ return "", err
+ }
+ kf, err := ioutil.TempFile(pwd, "test_oauth2")
+ if err != nil {
+ return "", err
+ }
+ _, err = kf.WriteString(fmt.Sprintf(`{
+ "type":"sn_service_account",
+ "client_id":"client-id",
+ "client_secret":"client-secret",
+ "client_email":"oauth@test.org",
+ "issuer_url":"%s"
+}`, server))
+ if err != nil {
+ return "", err
+ }
+
+ return kf.Name(), nil
+}
+
+func TestOAuth2Auth(t *testing.T) {
+ server := mockOAuthServer()
+ defer server.Close()
+ kf, err := mockKeyFile(server.URL)
+ defer os.Remove(kf)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ params := map[string]string{
+ auth.ConfigParamType: auth.ConfigParamTypeClientCredentials,
+ auth.ConfigParamIssuerURL: server.URL,
+ auth.ConfigParamClientID: "client-id",
+ auth.ConfigParamAudience: "audience",
+ auth.ConfigParamKeyFile: kf,
+ }
+
+ oauth := NewAuthenticationOAuth2(params)
+ client, err := NewClient(ClientOptions{
+ URL: serviceURL,
+ Authentication: oauth,
+ })
+ assert.NoError(t, err)
+
+ producer, err := client.CreateProducer(ProducerOptions{
+ Topic: newAuthTopicName(),
+ })
+
+ assert.NoError(t, err)
+ assert.NotNil(t, producer)
+
+ client.Close()
+}
+
func TestTopicPartitions(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: "pulsar://localhost:6650",
diff --git a/pulsar/internal/auth/oauth2.go b/pulsar/internal/auth/oauth2.go
new file mode 100644
index 0000000..9ee63ab
--- /dev/null
+++ b/pulsar/internal/auth/oauth2.go
@@ -0,0 +1,145 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package auth
+
+import (
+ "crypto/tls"
+ "fmt"
+
+ "github.com/apache/pulsar-client-go/oauth2"
+ "github.com/apache/pulsar-client-go/oauth2/cache"
+ "github.com/apache/pulsar-client-go/oauth2/clock"
+ "github.com/apache/pulsar-client-go/oauth2/store"
+)
+
+const (
+ ConfigParamType = "type"
+ ConfigParamTypeClientCredentials = "client_credentials"
+ ConfigParamIssuerURL = "issuerUrl"
+ ConfigParamAudience = "audience"
+ ConfigParamKeyFile = "privateKey"
+ ConfigParamClientID = "clientId"
+)
+
+type oauth2AuthProvider struct {
+ clock clock.Clock
+ issuer oauth2.Issuer
+ store store.Store
+ source cache.CachingTokenSource
+}
+
+// NewAuthenticationOAuth2WithParams return a interface of Provider with string map.
+func NewAuthenticationOAuth2WithParams(params map[string]string) (Provider, error) {
+ issuer := oauth2.Issuer{
+ IssuerEndpoint: params[ConfigParamIssuerURL],
+ ClientID: params[ConfigParamClientID],
+ Audience: params[ConfigParamAudience],
+ }
+
+ // initialize a store of authorization grants
+ st := store.NewMemoryStore()
+ switch params[ConfigParamType] {
+ case ConfigParamTypeClientCredentials:
+ flow, err := oauth2.NewDefaultClientCredentialsFlow(oauth2.ClientCredentialsFlowOptions{
+ KeyFile: params[ConfigParamKeyFile],
+ AdditionalScopes: nil,
+ })
+ if err != nil {
+ return nil, err
+ }
+ grant, err := flow.Authorize(issuer.Audience)
+ if err != nil {
+ return nil, err
+ }
+ err = st.SaveGrant(issuer.Audience, *grant)
+ if err != nil {
+ return nil, err
+ }
+ default:
+ return nil, fmt.Errorf("unsupported authentication type: %s", params[ConfigParamType])
+ }
+
+ return NewAuthenticationOAuth2(issuer, st), nil
+}
+
+func NewAuthenticationOAuth2(
+ issuer oauth2.Issuer,
+ store store.Store) Provider {
+
+ return &oauth2AuthProvider{
+ clock: clock.RealClock{},
+ issuer: issuer,
+ store: store,
+ }
+}
+
+func (p *oauth2AuthProvider) Init() error {
+ grant, err := p.store.LoadGrant(p.issuer.Audience)
+ if err != nil {
+ if err == store.ErrNoAuthenticationData {
+ return nil
+ }
+ return err
+ }
+ refresher, err := p.getRefresher(grant.Type)
+ if err != nil {
+ return err
+ }
+
+ source, err := cache.NewDefaultTokenCache(p.store, p.issuer.Audience, refresher)
+ if err != nil {
+ return err
+ }
+ p.source = source
+ return nil
+}
+
+func (p *oauth2AuthProvider) Name() string {
+ return "token"
+}
+
+func (p *oauth2AuthProvider) GetTLSCertificate() (*tls.Certificate, error) {
+ return nil, nil
+}
+
+func (p *oauth2AuthProvider) GetData() ([]byte, error) {
+ if p.source == nil {
+ // anonymous access
+ return nil, nil
+ }
+ token, err := p.source.Token()
+ if err != nil {
+ return nil, err
+ }
+ return []byte(token.AccessToken), nil
+}
+
+func (p *oauth2AuthProvider) Close() error {
+ return nil
+}
+
+func (p *oauth2AuthProvider) getRefresher(t oauth2.AuthorizationGrantType) (oauth2.AuthorizationGrantRefresher, error) {
+ switch t {
+ case oauth2.GrantTypeClientCredentials:
+ return oauth2.NewDefaultClientCredentialsGrantRefresher(p.clock)
+ case oauth2.GrantTypeDeviceCode:
+ return oauth2.NewDefaultDeviceAuthorizationGrantRefresher(p.clock)
+ default:
+ return nil, store.ErrUnsupportedAuthData
+ }
+}
diff --git a/pulsar/internal/auth/oauth2_test.go b/pulsar/internal/auth/oauth2_test.go
new file mode 100644
index 0000000..c8a4830
--- /dev/null
+++ b/pulsar/internal/auth/oauth2_test.go
@@ -0,0 +1,117 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package auth
+
+import (
+ "fmt"
+ "io/ioutil"
+ "net/http"
+ "net/http/httptest"
+ "os"
+ "testing"
+
+ "github.com/pkg/errors"
+ "github.com/stretchr/testify/assert"
+)
+
+// mockOAuthServer will mock a oauth service for the tests
+func mockOAuthServer() *httptest.Server {
+ // prepare a port for the mocked server
+ server := httptest.NewUnstartedServer(http.DefaultServeMux)
+
+ // mock the used REST path for the tests
+ mockedHandler := http.NewServeMux()
+ mockedHandler.HandleFunc("/.well-known/openid-configuration", func(writer http.ResponseWriter, request *http.Request) {
+ s := fmt.Sprintf(`{
+ "issuer":"%s",
+ "authorization_endpoint":"%s/authorize",
+ "token_endpoint":"%s/oauth/token",
+ "device_authorization_endpoint":"%s/oauth/device/code"
+}`, server.URL, server.URL, server.URL, server.URL)
+ fmt.Fprintln(writer, s)
+ })
+ mockedHandler.HandleFunc("/oauth/token", func(writer http.ResponseWriter, request *http.Request) {
+ fmt.Fprintln(writer, "{\n \"access_token\": \"token-content\",\n \"token_type\": \"Bearer\"\n}")
+ })
+ mockedHandler.HandleFunc("/authorize", func(writer http.ResponseWriter, request *http.Request) {
+ fmt.Fprintln(writer, "true")
+ })
+
+ server.Config.Handler = mockedHandler
+ server.Start()
+
+ return server
+}
+
+// mockKeyFile will mock a temp key file for testing.
+func mockKeyFile(server string) (string, error) {
+ pwd, err := os.Getwd()
+ if err != nil {
+ return "", err
+ }
+ kf, err := ioutil.TempFile(pwd, "test_oauth2")
+ if err != nil {
+ return "", err
+ }
+ _, err = kf.WriteString(fmt.Sprintf(`{
+ "type":"sn_service_account",
+ "client_id":"client-id",
+ "client_secret":"client-secret",
+ "client_email":"oauth@test.org",
+ "issuer_url":"%s"
+}`, server))
+ if err != nil {
+ return "", err
+ }
+
+ return kf.Name(), nil
+}
+
+func TestNewAuthenticationOAuth2WithParams(t *testing.T) {
+ server := mockOAuthServer()
+ defer server.Close()
+ kf, err := mockKeyFile(server.URL)
+ defer os.Remove(kf)
+ if err != nil {
+ t.Fatal(errors.Wrap(err, "create mocked key file failed"))
+ }
+
+ params := map[string]string{
+ ConfigParamType: ConfigParamTypeClientCredentials,
+ ConfigParamIssuerURL: server.URL,
+ ConfigParamClientID: "client-id",
+ ConfigParamAudience: "audience",
+ ConfigParamKeyFile: kf,
+ }
+
+ auth, err := NewAuthenticationOAuth2WithParams(params)
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = auth.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ token, err := auth.GetData()
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ assert.Equal(t, "token-content", string(token))
+}
diff --git a/pulsar/internal/auth/token.go b/pulsar/internal/auth/token.go
index e6a6d97..d8d0cd3 100644
--- a/pulsar/internal/auth/token.go
+++ b/pulsar/internal/auth/token.go
@@ -102,6 +102,6 @@ func (p *tokenAuthProvider) GetData() ([]byte, error) {
return []byte(t), nil
}
-func (tokenAuthProvider) Close() error {
+func (p *tokenAuthProvider) Close() error {
return nil
}