You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/04/25 13:38:36 UTC

[pulsar-client-go] branch master updated: [auth] Add Athenz authentication provider (#227)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f822ef  [auth] Add Athenz authentication provider (#227)
0f822ef is described below

commit 0f822efff1c1a78a517982aed43d56e48eed3778
Author: Yuri Mizushima <eq...@gmail.com>
AuthorDate: Sat Apr 25 22:38:30 2020 +0900

    [auth] Add Athenz authentication provider (#227)
    
    * feat: add athenz authentication provider
    
    * style: fix style errors
---
 go.mod                              |   3 +-
 go.sum                              |  39 ++++++++
 pulsar/client.go                    |   5 +
 pulsar/client_impl.go               |   4 +
 pulsar/internal/auth/athenz.go      | 179 ++++++++++++++++++++++++++++++++++++
 pulsar/internal/auth/athenz_test.go | 130 ++++++++++++++++++++++++++
 pulsar/internal/auth/provider.go    |   8 +-
 7 files changed, 366 insertions(+), 2 deletions(-)

diff --git a/go.mod b/go.mod
index bdbadf1..34bae4e 100644
--- a/go.mod
+++ b/go.mod
@@ -15,5 +15,6 @@ require (
 	github.com/spaolacci/murmur3 v1.1.0
 	github.com/spf13/cobra v0.0.3
 	github.com/spf13/pflag v1.0.3 // indirect
-	github.com/stretchr/testify v1.3.0
+	github.com/stretchr/testify v1.4.0
+	github.com/yahoo/athenz v1.8.55
 )
diff --git a/go.sum b/go.sum
index 3d8be2a..f66879a 100644
--- a/go.sum
+++ b/go.sum
@@ -1,16 +1,27 @@
+github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
+github.com/ardielle/ardielle-go v1.5.2 h1:TilHTpHIQJ27R1Tl/iITBzMwiUGSlVfiVhwDNGM3Zj4=
+github.com/ardielle/ardielle-go v1.5.2/go.mod h1:I4hy1n795cUhaVt/ojz83SNVCYIGsAFAONtv2Dr7HUI=
+github.com/ardielle/ardielle-tools v1.5.4/go.mod h1:oZN+JRMnqGiIhrzkRN9l26Cej9dEx4jeNG6A+AdkShk=
 github.com/beefsack/go-rate v0.0.0-20180408011153-efa7637bb9b6 h1:KXlsf+qt/X5ttPGEjR0tPH1xaWWoKBEg9Q1THAj2h3I=
 github.com/beefsack/go-rate v0.0.0-20180408011153-efa7637bb9b6/go.mod h1:6YNgTHLutezwnBvyneBbwvB8C82y3dcoOj5EQJIdGXA=
 github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b h1:AP/Y7sqYicnjGDfD5VcY4CIfh1hRXBUavxrvELjTiOE=
 github.com/bmizerany/perks v0.0.0-20141205001514-d9a9656a3a4b/go.mod h1:ac9efd0D1fsDb3EJvhqgXRbFx7bs2wqZ10HQPeU8U/Q=
+github.com/boynton/repl v0.0.0-20170116235056-348863958e3e/go.mod h1:Crc/GCZ3NXDVCio7Yr0o+SSrytpcFhLmVCIzi0s49t4=
 github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
+github.com/dimfeld/httptreemux v5.0.1+incompatible h1:Qj3gVcDNoOthBAqftuD596rm4wg/adLLz5xh5CmpiCA=
+github.com/dimfeld/httptreemux v5.0.1+incompatible/go.mod h1:rbUlSV+CCpv/SuqUTP/8Bk2O3LyUV436/yaRGkhP6Z0=
 github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg=
 github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
 github.com/google/go-cmp v0.3.0 h1:crn/baboCvb5fXaQ0IJ1SGTsTVrWpDsCWC8EGETZijY=
 github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
+github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg=
+github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
 github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM=
 github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
+github.com/jawher/mow.cli v1.0.4/go.mod h1:5hQj2V8g+qYmLUVWqu4Wuja1pI57M83EChYLVZ0sMKk=
+github.com/jawher/mow.cli v1.1.0/go.mod h1:aNaQlc7ozF3vw6IJ2dHjp2ZFiA4ozMIYY6PyuRJwlUg=
 github.com/klauspost/compress v1.9.2 h1:LfVyl+ZlLlLDeQ/d2AqfGIIH4qEDu0Ed2S5GyhCWIWY=
 github.com/klauspost/compress v1.9.2/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A=
 github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk=
@@ -31,8 +42,36 @@ github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg=
 github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4=
 github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
 github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
+github.com/stretchr/objx v0.2.0 h1:Hbg2NidpLE8veEBkEZTL3CvlkUIVzuU9jDplZO54c48=
+github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=
 github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
 github.com/stretchr/testify v1.3.0 h1:TivCn/peBQ7UY8ooIcPgZFpTNSz0Q2U6UrFlUfqbe0Q=
 github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
+github.com/stretchr/testify v1.4.0 h1:2E4SXV/wtOkTonXsotYi4li6zVWxYlZuYNCXe9XRJyk=
+github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
+github.com/yahoo/athenz v1.8.55 h1:xGhxN3yLq334APyn0Zvcc+aqu78Q7BBhYJevM3EtTW0=
+github.com/yahoo/athenz v1.8.55/go.mod h1:G7LLFUH7Z/r4QAB7FfudfuA7Am/eCzO1GlzBhDL6Kv0=
+golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
+golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
+golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
+golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
+golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 h1:I6FyU15t786LL7oL/hn43zqTuEGr4PN7F4XJ1p4E3Y8=
 golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/sys v0.0.0-20190804053845-51ab0e2deafa h1:KIDDMLT1O0Nr7TSxp8xM5tJcdn8tgyAONntO829og1M=
+golang.org/x/sys v0.0.0-20190804053845-51ab0e2deafa/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
+golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
+golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk=
+golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
+golang.org/x/tools v0.0.0-20190808195139-e713427fea3f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
+golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
+gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
+gopkg.in/natefinch/lumberjack.v2 v2.0.0/go.mod h1:l0ndWWf7gzL7RNwBG7wST/UCcT4T24xpD6X8LsfU/+k=
+gopkg.in/square/go-jose.v2 v2.4.1/go.mod h1:M9dMgbHiYLoDGQrXy7OpJDJWiKiU//h+vD76mk0e1AI=
+gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
+gopkg.in/yaml.v2 v2.2.7 h1:VUgggvou5XRW9mHwD/yXxIYSMtY0zoKQf/v226p2nyo=
+gopkg.in/yaml.v2 v2.2.7/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
diff --git a/pulsar/client.go b/pulsar/client.go
index 5a98496..53de468 100644
--- a/pulsar/client.go
+++ b/pulsar/client.go
@@ -57,6 +57,11 @@ func NewAuthenticationTLS(certificatePath string, privateKeyPath string) Authent
 	return auth.NewAuthenticationTLS(certificatePath, privateKeyPath)
 }
 
+func NewAuthenticationAthenz(authParams map[string]string) Authentication {
+	athenz, _ := auth.NewAuthenticationAthenzWithParams(authParams)
+	return athenz
+}
+
 // Builder interface that is used to construct a Pulsar Client instance.
 type ClientOptions struct {
 	// Configure the service URL for the Pulsar service.
diff --git a/pulsar/client_impl.go b/pulsar/client_impl.go
index f768389..02d9883 100644
--- a/pulsar/client_impl.go
+++ b/pulsar/client_impl.go
@@ -80,6 +80,10 @@ func newClient(options ClientOptions) (Client, error) {
 			return nil, errors.New("invalid auth provider interface")
 		}
 	}
+	err = authProvider.Init()
+	if err != nil {
+		return nil, err
+	}
 
 	connectionTimeout := options.ConnectionTimeout
 	if connectionTimeout.Nanoseconds() == 0 {
diff --git a/pulsar/internal/auth/athenz.go b/pulsar/internal/auth/athenz.go
new file mode 100644
index 0000000..3d650f7
--- /dev/null
+++ b/pulsar/internal/auth/athenz.go
@@ -0,0 +1,179 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package auth
+
+import (
+	"crypto/tls"
+	"encoding/base64"
+	"errors"
+	"io/ioutil"
+	"regexp"
+	"strings"
+	"time"
+
+	zms "github.com/yahoo/athenz/libs/go/zmssvctoken"
+	zts "github.com/yahoo/athenz/libs/go/ztsroletoken"
+)
+
+const (
+	minExpire = 2 * time.Hour
+	maxExpire = 24 * time.Hour
+)
+
+type athenzAuthProvider struct {
+	providerDomain     string
+	tenantDomain       string
+	tenantService      string
+	privateKey         string
+	keyID              string
+	principalHeader    string
+	ztsURL             string
+	tokenBuilder       zms.TokenBuilder
+	roleToken          zts.RoleToken
+	zmsNewTokenBuilder func(domain, name string, privateKeyPEM []byte, keyVersion string) (zms.TokenBuilder, error)
+	ztsNewRoleToken    func(tok zms.Token, domain string, opts zts.RoleTokenOptions) zts.RoleToken
+}
+
+type privateKeyURI struct {
+	Scheme                   string
+	MediaTypeAndEncodingType string
+	Data                     string
+	Path                     string
+}
+
+func NewAuthenticationAthenzWithParams(params map[string]string) (Provider, error) {
+	return NewAuthenticationAthenz(
+		params["providerDomain"],
+		params["tenantDomain"],
+		params["tenantService"],
+		params["privateKey"],
+		params["keyId"],
+		params["principalHeader"],
+		params["ztsUrl"],
+	), nil
+}
+
+func NewAuthenticationAthenz(
+	providerDomain string,
+	tenantDomain string,
+	tenantService string,
+	privateKey string,
+	keyID string,
+	principalHeader string,
+	ztsURL string) Provider {
+	var fixedKeyID string
+	if keyID == "" {
+		fixedKeyID = "0"
+	} else {
+		fixedKeyID = keyID
+	}
+	ztsNewRoleToken := func(tok zms.Token, domain string, opts zts.RoleTokenOptions) zts.RoleToken {
+		return zts.RoleToken(zts.NewRoleToken(tok, domain, opts))
+	}
+
+	return &athenzAuthProvider{
+		providerDomain:     providerDomain,
+		tenantDomain:       tenantDomain,
+		tenantService:      tenantService,
+		privateKey:         privateKey,
+		keyID:              fixedKeyID,
+		principalHeader:    principalHeader,
+		ztsURL:             strings.TrimSuffix(ztsURL, "/"),
+		zmsNewTokenBuilder: zms.NewTokenBuilder,
+		ztsNewRoleToken:    ztsNewRoleToken,
+	}
+}
+
+func (p *athenzAuthProvider) Init() error {
+	uriSt := parseURI(p.privateKey)
+	var keyData []byte
+
+	if uriSt.Scheme == "data" {
+		if uriSt.MediaTypeAndEncodingType != "application/x-pem-file;base64" {
+			return errors.New("Unsupported mediaType or encodingType: " + uriSt.MediaTypeAndEncodingType)
+		}
+		key, err := base64.StdEncoding.DecodeString(uriSt.Data)
+		if err != nil {
+			return err
+		}
+		keyData = key
+	} else if uriSt.Scheme == "file" {
+		key, err := ioutil.ReadFile(uriSt.Path)
+		if err != nil {
+			return err
+		}
+		keyData = key
+	} else {
+		return errors.New("Unsupported URI Scheme: " + uriSt.Scheme)
+	}
+
+	tb, err := p.zmsNewTokenBuilder(p.tenantDomain, p.tenantService, keyData, p.keyID)
+	if err != nil {
+		return err
+	}
+	p.tokenBuilder = tb
+
+	roleToken := p.ztsNewRoleToken(p.tokenBuilder.Token(), p.providerDomain, zts.RoleTokenOptions{
+		BaseZTSURL: p.ztsURL + "/zts/v1",
+		MinExpire:  minExpire,
+		MaxExpire:  maxExpire,
+		AuthHeader: p.principalHeader,
+	})
+	p.roleToken = roleToken
+
+	return nil
+}
+
+func (p *athenzAuthProvider) Name() string {
+	return "athenz"
+}
+
+func (p *athenzAuthProvider) GetTLSCertificate() (*tls.Certificate, error) {
+	return nil, nil
+}
+
+func (p *athenzAuthProvider) GetData() ([]byte, error) {
+	tok, err := p.roleToken.RoleTokenValue()
+	if err != nil {
+		return nil, err
+	}
+
+	return []byte(tok), nil
+}
+
+func (p *athenzAuthProvider) Close() error {
+	return nil
+}
+
+func parseURI(uri string) privateKeyURI {
+	var uriSt privateKeyURI
+	// scheme mediatype[;base64] path file
+	const expression = `^(?:([^:/?#]+):)(?:([;/\\\-\w]*),)?(?:/{0,2}((?:[^?#/]*/)*))?([^?#]*)`
+
+	// when expression cannot be parsed, then panics
+	re := regexp.MustCompile(expression)
+	if re.MatchString(uri) {
+		groups := re.FindStringSubmatch(uri)
+		uriSt.Scheme = groups[1]
+		uriSt.MediaTypeAndEncodingType = groups[2]
+		uriSt.Data = groups[4]
+		uriSt.Path = groups[3] + groups[4]
+	}
+
+	return uriSt
+}
diff --git a/pulsar/internal/auth/athenz_test.go b/pulsar/internal/auth/athenz_test.go
new file mode 100644
index 0000000..9fbb670
--- /dev/null
+++ b/pulsar/internal/auth/athenz_test.go
@@ -0,0 +1,130 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package auth
+
+import (
+	"bytes"
+	"errors"
+	"io/ioutil"
+	"testing"
+	"time"
+
+	"github.com/stretchr/testify/assert"
+	mock "github.com/stretchr/testify/mock"
+	zms "github.com/yahoo/athenz/libs/go/zmssvctoken"
+	zts "github.com/yahoo/athenz/libs/go/ztsroletoken"
+)
+
+const tlsClientKeyPath = "../../../integration-tests/certs/client-key.pem"
+
+type MockTokenBuilder struct {
+	mock.Mock
+}
+
+type MockToken struct {
+	mock.Mock
+}
+
+type MockRoleToken struct {
+	mock.Mock
+}
+
+func (m *MockTokenBuilder) SetExpiration(t time.Duration) {
+}
+func (m *MockTokenBuilder) SetHostname(h string) {
+}
+func (m *MockTokenBuilder) SetIPAddress(ip string) {
+}
+func (m *MockTokenBuilder) SetKeyService(keyService string) {
+}
+func (m *MockTokenBuilder) Token() zms.Token {
+	result := m.Called()
+	return result.Get(0).(zms.Token)
+}
+
+func (m *MockToken) Value() (string, error) {
+	result := m.Called()
+	return result.Get(0).(string), result.Error(1)
+}
+
+func (m *MockRoleToken) RoleTokenValue() (string, error) {
+	result := m.Called()
+	return result.Get(0).(string), result.Error(1)
+}
+
+func MockZmsNewTokenBuilder(domain, name string, privateKeyPEM []byte, keyVersion string) (zms.TokenBuilder, error) {
+	// assertion
+	key, err := ioutil.ReadFile(tlsClientKeyPath)
+	if err != nil {
+		return nil, err
+	}
+	if domain != "pulsar.test.tenant" ||
+		name != "service" ||
+		!bytes.Equal(key, privateKeyPEM) ||
+		keyVersion != "0" {
+		return nil, errors.New("Assertion error")
+	}
+
+	mockToken := new(MockToken)
+	mockToken.On("Value").Return("mockPrincipalToken", nil)
+	mockTokenBuilder := new(MockTokenBuilder)
+	mockTokenBuilder.On("Token").Return(mockToken)
+	return mockTokenBuilder, nil
+}
+
+func MockZtsNewRoleToken(tok zms.Token, domain string, opts zts.RoleTokenOptions) zts.RoleToken {
+	// assertion
+	token, err := tok.Value()
+	if err != nil {
+		return nil
+	}
+	if token != "mockPrincipalToken" ||
+		domain != "pulsar.test.provider" ||
+		opts.BaseZTSURL != "http://localhost:9999/zts/v1" ||
+		opts.AuthHeader != "" {
+		return nil
+	}
+
+	mockRoleToken := new(MockRoleToken)
+	mockRoleToken.On("RoleTokenValue").Return("mockRoleToken", nil)
+	return mockRoleToken
+}
+
+func TestAthenzAuth(t *testing.T) {
+	privateKey := "file://" + tlsClientKeyPath
+	provider := NewAuthenticationAthenz(
+		"pulsar.test.provider",
+		"pulsar.test.tenant",
+		"service",
+		privateKey,
+		"",
+		"",
+		"http://localhost:9999")
+
+	// inject mock function
+	athenz := provider.(*athenzAuthProvider)
+	athenz.zmsNewTokenBuilder = MockZmsNewTokenBuilder
+	athenz.ztsNewRoleToken = MockZtsNewRoleToken
+
+	err := athenz.Init()
+	assert.NoError(t, err)
+
+	data, err := athenz.GetData()
+	assert.Equal(t, []byte("mockRoleToken"), data)
+	assert.NoError(t, err)
+}
diff --git a/pulsar/internal/auth/provider.go b/pulsar/internal/auth/provider.go
index 8868df5..220ff14 100644
--- a/pulsar/internal/auth/provider.go
+++ b/pulsar/internal/auth/provider.go
@@ -19,6 +19,7 @@ package auth
 
 import (
 	"crypto/tls"
+	"encoding/json"
 	"fmt"
 	"io"
 
@@ -59,11 +60,16 @@ func NewProvider(name string, params string) (Provider, error) {
 	case "token", "org.apache.pulsar.client.impl.auth.AuthenticationToken":
 		return NewAuthenticationTokenWithParams(m)
 
+	case "athenz", "org.apache.pulsar.client.impl.auth.AuthenticationAthenz":
+		return NewAuthenticationAthenzWithParams(m)
+
 	default:
 		return nil, errors.New(fmt.Sprintf("invalid auth provider '%s'", name))
 	}
 }
 
 func parseParams(params string) map[string]string {
-	return nil
+	var mapString map[string]string
+	json.Unmarshal([]byte(params), &mapString)
+	return mapString
 }